]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.testing/src/org/simantics/db/testing/common/DatabaseProxy.java
Added missing org.simantics.db.{tests,testing} plug-ins.
[simantics/platform.git] / bundles / org.simantics.db.testing / src / org / simantics / db / testing / common / DatabaseProxy.java
1 package org.simantics.db.testing.common;
2 /*******************************************************************************
3  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4  * in Industry THTH ry.
5  * All rights reserved. This program and the accompanying materials
6  * are made available under the terms of the Eclipse Public License v1.0
7  * which accompanies this distribution, and is available at
8  * http://www.eclipse.org/legal/epl-v10.html
9  *
10  * Contributors:
11  *     VTT Technical Research Centre of Finland - initial API and implementation
12  *******************************************************************************/
13
14
15 import java.io.BufferedReader;
16 import java.io.IOException;
17 import java.io.InputStreamReader;
18 import java.io.PrintWriter;
19 import java.net.InetSocketAddress;
20 import java.net.ServerSocket;
21 import java.net.Socket;
22 import java.util.HashMap;
23 import java.util.Map;
24
25 public class DatabaseProxy implements Runnable {
26
27     private static boolean SMALL_PACKETS = false;
28         private static int PORT = 8891;
29         private boolean interrupted = false;
30         private Map<String, String> databases = new HashMap<String, String>();
31         
32         private ServerSocket service = null;
33
34         /**
35          * Initializes database proxy with static mappings.
36          * 
37          */
38         public DatabaseProxy() {
39                 databases.put("/public/", "192.168.3.11:9101");
40                 databases.put("FooBar", "localhost:6671");
41         }
42         
43         public DatabaseProxy(String host, int port) {
44                 databases.put("/test/", host+":"+port);         
45         }
46         
47         public void start() {
48                 Thread t = new Thread(this);
49                 init();
50                 t.start();
51         }
52         
53         public void stop() {
54                 interrupted = true;
55                 // We have to interrupt the the blocking accept call.
56                 // This is one way of doing it.
57         if(service != null)
58             try {
59                 service.close();
60             } catch (IOException e) {
61             }
62         }
63         
64         public int getPort() {
65                 return PORT;
66         }
67         
68         public void init() {
69                 try {
70                         service = new ServerSocket(PORT);
71                 } catch (IOException e) {
72                         e.printStackTrace();
73                         return;
74                 }
75         }
76         
77         public void run() {
78                 System.out.println("Accepting connections");
79                 
80         int connectionNumber = 0;
81                 while(!interrupted) {
82                         try {
83                                 final Socket socket = service.accept();
84                 String connectionName = "Connection" + ++connectionNumber;
85                 System.out.println(connectionName + ": Client connection accepted.");
86
87                                 Runnable runnable = new Runnable() {
88                                         @Override
89                                         public void run() {
90                                                 try {
91                                                         serve(socket);
92                                                         socket.close();
93                                                 } catch(IOException e) {
94                                                         e.printStackTrace();
95                                                 }
96                                         }
97                                 };
98                                 Thread t = new Thread(runnable, connectionName);
99                                 t.start();
100                         } catch (IOException e) {
101                 if (interrupted)
102                     break; // Blocking accept call intentionally interrupted by closing the socket.
103                                 e.printStackTrace();
104                         }
105                 }
106                 
107                 // Shutdown service
108                 try {
109                         if(service != null)
110                                 service.close();
111                 } catch (IOException ignore) {
112                 }
113         }
114         
115         /**
116          * Serves socket connection. Waits for a connection line from the client and parses it.
117          * The connection line must be in form "CONNECT\tDatabaseIdentifier"
118          * If the database name is valid, creates a connection to the database and works as a proxy 
119          * between the client and the database.
120          * 
121          * @param socket
122          * @throws IOException
123          */
124         private void serve(Socket socket) throws IOException {
125        String connectionName = Thread.currentThread().getName();
126
127                 BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
128                 PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
129
130                 String line = input.readLine();
131                 String columns[] = line.split("\t");
132                 if(columns.length < 2) {
133                         System.out.println(connectionName + ": serve() Invalid connection string [1] '"+line+"'");
134                         out.write("ERROR Invalid connection string\n");
135                         return;
136                 }
137                 String command = columns[0];
138                 String param = columns[1];
139                 
140                 /**
141                  * CONNECT is the only supported command
142                  */
143                 if("CONNECT".equals(command)) {
144                         if(databases.containsKey(param) == false) {
145                                 System.out.println(connectionName + ": serve() Invalid database '"+param+"'");
146                                 out.write("ERROR Invalid database '"+param+"'\n");
147                                 return;
148                         }
149                         // The second parameter is directly mapped to database name. In future some parsing is required.
150                         String address = databases.get(param);
151                 String[] split = address.split(":");
152                 if (split.length != 2)
153                     throw new IllegalArgumentException(connectionName+ ": address does not contain a port, missing ':' character");
154                 InetSocketAddress dbAddress =  new InetSocketAddress(split[0], Integer.parseInt(split[1]));
155
156                 Socket dbSocket = new Socket();
157                 try {
158                         dbSocket.connect(dbAddress);
159                 } catch(IOException e) {
160                                 System.out.println(connectionName + ":serve() Couldn't connect to database '"+param+"'");
161                                 out.write("ERROR Couldn't connect to database '"+param+"'\n");
162                                 return;
163                         }
164                         
165                         out.write("OK\n");
166                         out.flush();
167                         proxy(dbSocket, socket);
168                         
169                         dbSocket.close();
170                 } else {
171                         System.out.println(connectionName + ": serve() Invalid command '"+command+"'");
172                         out.write("ERROR Invalid command '"+command+"'\n");
173                         return;
174                 }
175         }
176         
177         /**
178          * Proxies data from socket a to socket b and vice versa
179          * This method blocks until one of the sockets closes or exception is thrown.
180          * 
181          * @param a
182          * @param b
183          */
184         private void proxy(final Socket a, final Socket b) {            
185                 // A -> B
186                 Runnable ab = new Runnable() {
187                         @Override
188             public void run() {
189                 try {
190                     byte buffer[] = new byte[256];
191                     while(a.isConnected() && b.isConnected()) {
192                         int n = a.getInputStream().read(buffer);
193                         if (n < 1)
194                             break;
195                         if (SMALL_PACKETS) {
196                             for (int i=0; i<n; ++i) {
197                                 b.getOutputStream().write(buffer, i, 1);
198                                 b.getOutputStream().flush();
199                                 try {
200                                     Thread.sleep(100); // ms
201                                 } catch (InterruptedException e) {
202                                     // TODO Auto-generated catch block
203                                     e.printStackTrace();
204                                 }
205                             }
206                         } else {
207                             b.getOutputStream().write(buffer, 0, n);
208                             b.getOutputStream().flush();
209                         }
210                     }
211                 } catch(IOException e) {
212                     return; // for debugging
213                 }
214             }
215                 };
216                 
217                 // B -> A
218                 Runnable ba = new Runnable() {
219                         @Override
220             public void run() {
221                 try {
222                     byte buffer[] = new byte[256];
223                     while(b.isConnected() && a.isConnected()) {
224                         int n = b.getInputStream().read(buffer);
225                         if (n < 1)
226                             break;
227                         if (SMALL_PACKETS) {
228                             for (int i=0; i<n; ++i) {
229                                 a.getOutputStream().write(buffer, i, 1);
230                                 try {
231                                     Thread.sleep(100); // ms
232                                 } catch (InterruptedException e) {
233                                     e.printStackTrace();
234                                 }
235                                 a.getOutputStream().flush();
236                             }
237                         } else {
238                             a.getOutputStream().write(buffer, 0, n);
239                             a.getOutputStream().flush();
240                         }
241                     }
242                 } catch(IOException e) {
243                     return; // for debugging
244                 }               
245             }
246                 };
247                 
248         String t = Thread.currentThread().getName();
249                 Thread tab = new Thread(ab, t + " server->client");
250                 Thread tba = new Thread(ba, t + " client->server");
251                 
252                 tab.start();
253                 tba.start();
254                 
255                 try {
256                         while(tba.isAlive() && tab.isAlive()) {
257                                 Thread.sleep(1000);
258                         }
259                 } catch (InterruptedException e) {
260                         // TODO Auto-generated catch block
261                         e.printStackTrace();
262                 }
263         }
264         
265         /**
266          * @param args
267          */
268         public static void main(String[] args) {
269                 DatabaseProxy proxy = new DatabaseProxy();
270                 proxy.init();
271                 proxy.run();
272         }
273
274 }