]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.testing/src/org/simantics/db/testing/common/DatabaseProxy.java
Added missing org.simantics.db.{tests,testing} plug-ins.
[simantics/platform.git] / bundles / org.simantics.db.testing / src / org / simantics / db / testing / common / DatabaseProxy.java
diff --git a/bundles/org.simantics.db.testing/src/org/simantics/db/testing/common/DatabaseProxy.java b/bundles/org.simantics.db.testing/src/org/simantics/db/testing/common/DatabaseProxy.java
new file mode 100644 (file)
index 0000000..5e2a709
--- /dev/null
@@ -0,0 +1,274 @@
+package org.simantics.db.testing.common;
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DatabaseProxy implements Runnable {
+
+    private static boolean SMALL_PACKETS = false;
+       private static int PORT = 8891;
+       private boolean interrupted = false;
+       private Map<String, String> databases = new HashMap<String, String>();
+       
+       private ServerSocket service = null;
+
+       /**
+        * Initializes database proxy with static mappings.
+        * 
+        */
+       public DatabaseProxy() {
+               databases.put("/public/", "192.168.3.11:9101");
+               databases.put("FooBar", "localhost:6671");
+       }
+       
+       public DatabaseProxy(String host, int port) {
+               databases.put("/test/", host+":"+port);         
+       }
+       
+       public void start() {
+               Thread t = new Thread(this);
+               init();
+               t.start();
+       }
+       
+       public void stop() {
+               interrupted = true;
+               // We have to interrupt the the blocking accept call.
+               // This is one way of doing it.
+        if(service != null)
+            try {
+                service.close();
+            } catch (IOException e) {
+            }
+       }
+       
+       public int getPort() {
+               return PORT;
+       }
+       
+       public void init() {
+               try {
+                       service = new ServerSocket(PORT);
+               } catch (IOException e) {
+                       e.printStackTrace();
+                       return;
+               }
+       }
+       
+       public void run() {
+               System.out.println("Accepting connections");
+               
+        int connectionNumber = 0;
+               while(!interrupted) {
+                       try {
+                               final Socket socket = service.accept();
+                String connectionName = "Connection" + ++connectionNumber;
+                System.out.println(connectionName + ": Client connection accepted.");
+
+                               Runnable runnable = new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               try {
+                                                       serve(socket);
+                                                       socket.close();
+                                               } catch(IOException e) {
+                                                       e.printStackTrace();
+                                               }
+                                       }
+                               };
+                               Thread t = new Thread(runnable, connectionName);
+                               t.start();
+                       } catch (IOException e) {
+                if (interrupted)
+                    break; // Blocking accept call intentionally interrupted by closing the socket.
+                               e.printStackTrace();
+                       }
+               }
+               
+               // Shutdown service
+               try {
+                       if(service != null)
+                               service.close();
+               } catch (IOException ignore) {
+               }
+       }
+       
+       /**
+        * Serves socket connection. Waits for a connection line from the client and parses it.
+        * The connection line must be in form "CONNECT\tDatabaseIdentifier"
+        * If the database name is valid, creates a connection to the database and works as a proxy 
+        * between the client and the database.
+        * 
+        * @param socket
+        * @throws IOException
+        */
+       private void serve(Socket socket) throws IOException {
+       String connectionName = Thread.currentThread().getName();
+
+               BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+               PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
+
+               String line = input.readLine();
+               String columns[] = line.split("\t");
+               if(columns.length < 2) {
+                       System.out.println(connectionName + ": serve() Invalid connection string [1] '"+line+"'");
+                       out.write("ERROR Invalid connection string\n");
+                       return;
+               }
+               String command = columns[0];
+               String param = columns[1];
+               
+               /**
+                * CONNECT is the only supported command
+                */
+               if("CONNECT".equals(command)) {
+                       if(databases.containsKey(param) == false) {
+                               System.out.println(connectionName + ": serve() Invalid database '"+param+"'");
+                               out.write("ERROR Invalid database '"+param+"'\n");
+                               return;
+                       }
+                       // The second parameter is directly mapped to database name. In future some parsing is required.
+                       String address = databases.get(param);
+               String[] split = address.split(":");
+               if (split.length != 2)
+                   throw new IllegalArgumentException(connectionName+ ": address does not contain a port, missing ':' character");
+               InetSocketAddress dbAddress =  new InetSocketAddress(split[0], Integer.parseInt(split[1]));
+
+               Socket dbSocket = new Socket();
+               try {
+                       dbSocket.connect(dbAddress);
+               } catch(IOException e) {
+                               System.out.println(connectionName + ":serve() Couldn't connect to database '"+param+"'");
+                               out.write("ERROR Couldn't connect to database '"+param+"'\n");
+                               return;
+                       }
+                       
+                       out.write("OK\n");
+                       out.flush();
+                       proxy(dbSocket, socket);
+                       
+                       dbSocket.close();
+               } else {
+                       System.out.println(connectionName + ": serve() Invalid command '"+command+"'");
+                       out.write("ERROR Invalid command '"+command+"'\n");
+                       return;
+               }
+       }
+       
+       /**
+        * Proxies data from socket a to socket b and vice versa
+        * This method blocks until one of the sockets closes or exception is thrown.
+        * 
+        * @param a
+        * @param b
+        */
+       private void proxy(final Socket a, final Socket b) {            
+               // A -> B
+               Runnable ab = new Runnable() {
+                       @Override
+            public void run() {
+                try {
+                    byte buffer[] = new byte[256];
+                    while(a.isConnected() && b.isConnected()) {
+                        int n = a.getInputStream().read(buffer);
+                        if (n < 1)
+                            break;
+                        if (SMALL_PACKETS) {
+                            for (int i=0; i<n; ++i) {
+                                b.getOutputStream().write(buffer, i, 1);
+                                b.getOutputStream().flush();
+                                try {
+                                    Thread.sleep(100); // ms
+                                } catch (InterruptedException e) {
+                                    // TODO Auto-generated catch block
+                                    e.printStackTrace();
+                                }
+                            }
+                        } else {
+                            b.getOutputStream().write(buffer, 0, n);
+                            b.getOutputStream().flush();
+                        }
+                    }
+                } catch(IOException e) {
+                    return; // for debugging
+                }
+            }
+               };
+               
+               // B -> A
+               Runnable ba = new Runnable() {
+                       @Override
+            public void run() {
+                try {
+                    byte buffer[] = new byte[256];
+                    while(b.isConnected() && a.isConnected()) {
+                        int n = b.getInputStream().read(buffer);
+                        if (n < 1)
+                            break;
+                        if (SMALL_PACKETS) {
+                            for (int i=0; i<n; ++i) {
+                                a.getOutputStream().write(buffer, i, 1);
+                                try {
+                                    Thread.sleep(100); // ms
+                                } catch (InterruptedException e) {
+                                    e.printStackTrace();
+                                }
+                                a.getOutputStream().flush();
+                            }
+                        } else {
+                            a.getOutputStream().write(buffer, 0, n);
+                            a.getOutputStream().flush();
+                        }
+                    }
+                } catch(IOException e) {
+                    return; // for debugging
+                }               
+            }
+               };
+               
+        String t = Thread.currentThread().getName();
+               Thread tab = new Thread(ab, t + " server->client");
+               Thread tba = new Thread(ba, t + " client->server");
+               
+               tab.start();
+               tba.start();
+               
+               try {
+                       while(tba.isAlive() && tab.isAlive()) {
+                               Thread.sleep(1000);
+                       }
+               } catch (InterruptedException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
+       }
+       
+       /**
+        * @param args
+        */
+       public static void main(String[] args) {
+               DatabaseProxy proxy = new DatabaseProxy();
+               proxy.init();
+               proxy.run();
+       }
+
+}