1 package org.rosuda.REngine.Rserve;
3 // JRclient library - client interface to Rserve, see http://www.rosuda.org/Rserve/
4 // Copyright (C) 2004-08 Simon Urbanek
5 // --- for licensing information see LICENSE file in the original JRclient distribution ---
10 import org.rosuda.REngine.*;
11 import org.rosuda.REngine.Rserve.protocol.*;
13 /** class providing TCP/IP connection to an Rserve
16 public class RConnection extends REngine {
17 /** last error string */
18 String lastError=null;
20 boolean connected=false;
23 boolean authReq=false;
24 int authType=AT_plain;
31 /** This static variable specifies the character set used to encode string for transfer. Under normal circumstances there should be no reason for changing this variable. The default is UTF-8, which makes sure that 7-bit ASCII characters are sent in a backward-compatible fashion. Currently (Rserve 0.1-7) there is no further conversion on Rserve's side, i.e. the strings are passed to R without re-coding. If necessary the setting should be changed <u>before</u> connecting to the Rserve in case later Rserves will provide a possibility of setting the encoding during the handshake. */
32 public static String transferCharset="UTF-8";
34 /** authorization type: plain text */
35 public static final int AT_plain = 0;
36 /** authorization type: unix crypt */
37 public static final int AT_crypt = 1;
39 /** version of the server (as reported in IDstring just after Rsrv) */
40 protected int rsrvVersion;
42 /** make a new local connection on default port (6311) */
43 public RConnection() throws RserveException {
44 this("127.0.0.1",6311);
47 /** make a new connection to specified host on default port (6311)
48 @param host host name/IP
50 public RConnection(String host) throws RserveException {
54 /** make a new connection to specified host and given port.
55 * Make sure you check {@link #isConnected} to ensure the connection was successfully created.
56 * @param host host name/IP
57 * @param port TCP port
59 public RConnection(String host, int port) throws RserveException {
60 this(host, port, null);
63 /** restore a connection based on a previously detached session
64 * @param session detached session object */
65 RConnection(RSession session) throws RserveException {
66 this(null, 0, session);
69 RConnection(String host, int port, RSession session) throws RserveException {
71 if (connected) s.close();
73 } catch (Exception e) {
74 throw new RserveException(this,"Cannot connect: "+e.getMessage());
84 s=new Socket(host,port);
85 // disable Nagle's algorithm since we really want immediate replies
86 s.setTcpNoDelay(true);
87 } catch (Exception sce) {
88 throw new RserveException(this,"Cannot connect: "+sce.getMessage());
91 is=s.getInputStream();
92 os=s.getOutputStream();
93 } catch (Exception gse) {
94 throw new RserveException(this,"Cannot get io stream: "+gse.getMessage());
98 byte[] IDs=new byte[32];
102 } catch (Exception sre) {
103 throw new RserveException(this,"Error while receiving data: "+sre.getMessage());
107 throw new RserveException(this,"Handshake failed: expected 32 bytes header, got "+n);
109 String ids=new String(IDs);
110 if (ids.substring(0,4).compareTo("Rsrv")!=0)
111 throw new RserveException(this,"Handshake failed: Rsrv signature expected, but received \""+ids+"\" instead.");
113 rsrvVersion=Integer.parseInt(ids.substring(4,8));
114 } catch (Exception px) {}
115 // we support (knowingly) up to 103
117 throw new RserveException(this,"Handshake failed: The server uses more recent protocol than this client.");
118 if (ids.substring(8,12).compareTo("QAP1")!=0)
119 throw new RserveException(this,"Handshake failed: unupported transfer protocol ("+ids.substring(8,12)+"), I talk only QAP1.");
120 for (int i=12;i<32;i+=4) {
121 String attr=ids.substring(i,i+4);
122 if (attr.compareTo("ARpt")==0) {
123 if (!authReq) { // this method is only fallback when no other was specified
128 if (attr.compareTo("ARuc")==0) {
132 if (attr.charAt(0)=='K') {
133 Key=attr.substring(1,3);
136 } catch (RserveException innerX) {
137 try { s.close(); } catch (Exception ex01) {}; is=null; os=null; s=null;
140 } else { // we have a session to take care of
142 os.write(session.key,0,32);
143 } catch (Exception sre) {
144 throw new RserveException(this,"Error while sending session key: "+sre.getMessage());
146 rsrvVersion = session.rsrvVersion;
152 public void finalize() {
158 /** get server version as reported during the handshake.
159 @return server version as integer (Rsrv0100 will return 100) */
160 public int getServerVersion() {
164 /** closes current connection */
165 public boolean close() {
167 if (s != null) s.close();
170 } catch(Exception e) { };
174 public String voidEvalS(String cmd) {
178 } catch (RserveException e) {
179 return e.getMessage();
183 /** evaluates the given command, but does not fetch the result (useful for assignment
185 @param cmd command/expression string */
186 public void voidEval(String cmd) throws RserveException {
187 if (!connected || rt==null)
188 throw new RserveException(this,"Not connected");
189 RPacket rp=rt.request(RTalk.CMD_voidEval,cmd+"\n");
190 if (rp!=null && rp.isOk()) return;
191 throw new RserveException(this,"voidEval failed",rp);
194 /** evaluates the given command, detaches the session (see @link{detach()}) and closes connection while the command is being evaluted (requires Rserve 0.4+).
195 Note that a session cannot be attached again until the commad was successfully processed. Techincally the session is put into listening mode while the command is being evaluated but accept is called only after the command was evaluated. One commonly used techique to monitor detached working sessions is to use second connection to poll the status (e.g. create a temporary file and return the full path before detaching thus allowing new connections to read it).
196 @param cmd command/expression string
197 @return session object that can be use to attach back to the session once the command completed */
198 public RSession voidEvalDetach(String cmd) throws RserveException {
199 if (!connected || rt==null)
200 throw new RserveException(this,"Not connected");
201 RPacket rp=rt.request(RTalk.CMD_detachedVoidEval,cmd+"\n");
202 if (rp==null || !rp.isOk())
203 throw new RserveException(this,"detached void eval failed",rp);
204 RSession s = new RSession(this, rp);
209 REXP parseEvalResponse(RPacket rp) throws RserveException {
211 byte[] pc=rp.getCont();
212 if (rsrvVersion>100) { /* since 0101 eval responds correctly by using DT_SEXP type/len header which is 4 bytes long */
214 /* we should check parameter type (should be DT_SEXP) and fail if it's not */
215 if (pc[0]!=RTalk.DT_SEXP && pc[0]!=(RTalk.DT_SEXP|RTalk.DT_LARGE))
216 throw new RserveException(this,"Error while processing eval output: SEXP (type "+RTalk.DT_SEXP+") expected but found result type "+pc[0]+".");
217 if (pc[0]==(RTalk.DT_SEXP|RTalk.DT_LARGE))
218 rxo=8; // large data need skip of 8 bytes
219 /* warning: we are not checking or using the length - we assume that only the one SEXP is returned. This is true for the current CMD_eval implementation, but may not be in the future. */
223 REXPFactory rx=new REXPFactory();
224 rx.parseREXP(pc, rxo);
226 } catch (REXPMismatchException me) {
227 me.printStackTrace();
228 throw new RserveException(this, "Error when parsing response: "+me.getMessage());
234 /** evaluates the given command and retrieves the result
235 @param cmd command/expression string
236 @return R-xpression or <code>null</code> if an error occured */
237 public REXP eval(String cmd) throws RserveException {
238 if (!connected || rt==null)
239 throw new RserveException(this,"Not connected");
240 RPacket rp=rt.request(RTalk.CMD_eval,cmd+"\n");
241 if (rp!=null && rp.isOk())
242 return parseEvalResponse(rp);
243 throw new RserveException(this,"eval failed",rp);
246 /** assign a string value to a symbol in R. The symbol is created if it doesn't exist already.
247 @param sym symbol name. Currently assign uses CMD_setSEXP command of Rserve, i.e. the symbol value is NOT parsed. It is the responsibility of the user to make sure that the symbol name is valid in R (recall the difference between a symbol and an expression!). In fact R will always create the symbol, but it may not be accessible (examples: "bar\nfoo" or "bar$foo").
250 public void assign(String sym, String ct) throws RserveException {
251 if (!connected || rt==null)
252 throw new RserveException(this,"Not connected");
253 byte[] symn=sym.getBytes();
254 byte[] ctn=ct.getBytes();
255 int sl=symn.length+1;
257 if ((sl&3)>0) sl=(sl&0xfffffc)+4; // make sure the symbol length is divisible by 4
258 if ((cl&3)>0) cl=(cl&0xfffffc)+4; // make sure the content length is divisible by 4
259 byte[] rq=new byte[sl+4+cl+4];
261 for(ic=0;ic<symn.length;ic++) rq[ic+4]=symn[ic];
262 while (ic<sl) { rq[ic+4]=0; ic++; }
263 for(ic=0;ic<ctn.length;ic++) rq[ic+sl+8]=ctn[ic];
264 while (ic<cl) { rq[ic+sl+8]=0; ic++; }
265 RTalk.setHdr(RTalk.DT_STRING,sl,rq,0);
266 RTalk.setHdr(RTalk.DT_STRING,cl,rq,sl+4);
267 RPacket rp=rt.request(RTalk.CMD_setSEXP,rq);
268 if (rp!=null && rp.isOk()) return;
269 throw new RserveException(this,"assign failed",rp);
272 /** assign a content of a REXP to a symbol in R. The symbol is created if it doesn't exist already.
273 * @param sym symbol name. Currently assign uses CMD_setSEXP command of Rserve, i.e. the symbol value is NOT parsed. It is the responsibility of the user to make sure that the symbol name is valid in R (recall the difference between a symbol and an expression!). In fact R will always create the symbol, but it may not be accessible (examples: "bar\nfoo" or "bar$foo").
274 * @param rexp contents
276 public void assign(String sym, REXP rexp) throws RserveException {
277 if (!connected || rt==null)
278 throw new RserveException(this,"Not connected");
280 REXPFactory r = new REXPFactory(rexp);
281 int rl=r.getBinaryLength();
282 byte[] symn=sym.getBytes();
283 int sl=symn.length+1;
284 if ((sl&3)>0) sl=(sl&0xfffffc)+4; // make sure the symbol length is divisible by 4
285 byte[] rq=new byte[sl+rl+((rl>0xfffff0)?12:8)];
287 for(ic=0;ic<symn.length;ic++) rq[ic+4]=symn[ic];
288 while(ic<sl) { rq[ic+4]=0; ic++; }; // pad with 0
289 RTalk.setHdr(RTalk.DT_STRING,sl,rq,0);
290 RTalk.setHdr(RTalk.DT_SEXP,rl,rq,sl+4);
291 r.getBinaryRepresentation(rq,sl+((rl>0xfffff0)?12:8));
292 RPacket rp=rt.request(RTalk.CMD_setSEXP,rq);
293 if (rp!=null && rp.isOk()) return;
294 throw new RserveException(this,"assign failed",rp);
295 } catch (REXPMismatchException me) {
296 throw new RserveException(this, "Error creating binary representation: "+me.getMessage());
300 /** open a file on the Rserve for reading
301 @param fn file name. should not contain any path delimiters, since Rserve may restrict the access to local working directory.
302 @return input stream to be used for reading. Note that the stream is read-once only, there is no support for seek or rewind. */
303 public RFileInputStream openFile(String fn) throws IOException {
304 return new RFileInputStream(rt,fn);
307 /** create a file on the Rserve for writing
308 @param fn file name. should not contain any path delimiters, since Rserve may restrict the access to local working directory.
309 @return output stream to be used for writinging. Note that the stream is write-once only, there is no support for seek or rewind. */
310 public RFileOutputStream createFile(String fn) throws IOException {
311 return new RFileOutputStream(rt,fn);
314 /** remove a file on the Rserve
315 @param fn file name. should not contain any path delimiters, since Rserve may restrict the access to local working directory. */
316 public void removeFile(String fn) throws RserveException {
317 if (!connected || rt==null)
318 throw new RserveException(this,"Not connected");
319 RPacket rp=rt.request(RTalk.CMD_removeFile,fn);
320 if (rp!=null && rp.isOk()) return;
321 throw new RserveException(this,"removeFile failed",rp);
324 /** shutdown remote Rserve. Note that some Rserves cannot be shut down from the client side. */
325 public void shutdown() throws RserveException {
326 if (!connected || rt==null)
327 throw new RserveException(this,"Not connected");
329 RPacket rp=rt.request(RTalk.CMD_shutdown);
330 if (rp!=null && rp.isOk()) return;
331 throw new RserveException(this,"shutdown failed",rp);
334 /** Sets send buffer size of the Rserve (in bytes) for the current connection. All responses send by Rserve are stored in the send buffer before transmitting. This means that any objects you want to get from the Rserve need to fit into that buffer. By default the size of the send buffer is 2MB. If you need to receive larger objects from Rserve, you will need to use this function to enlarge the buffer. In order to save memory, you can also reduce the buffer size once it's not used anymore. Currently the buffer size is only limited by the memory available and/or 1GB (whichever is smaller). Current Rserve implementations won't go below buffer sizes of 32kb though. If the specified buffer size results in 'out of memory' on the server, the corresponding error is sent and the connection is terminated.<br>
335 <i>Note:</i> This command may go away in future versions of Rserve which will use dynamic send buffer allocation.
336 @param sbs send buffer size (in bytes) min=32k, max=1GB
338 public void setSendBufferSize(long sbs) throws RserveException {
339 if (!connected || rt==null)
340 throw new RserveException(this,"Not connected");
342 RPacket rp=rt.request(RTalk.CMD_setBufferSize,(int)sbs);
343 if (rp!=null && rp.isOk()) return;
344 throw new RserveException(this,"setSendBufferSize failed",rp);
347 /** set string encoding for this session. It is strongly
348 * recommended to make sure the encoding is always set to UTF-8
349 * because that is the only encoding supported by this Java
350 * client. It can be done either by uisng the
351 * <code>encoding</code> option in the server or by calling
352 * setStringEncoding("utf8") at the beginning of a session (but
354 @param enc name of the encoding as defined by Rserve - as of
355 Rserve version 0.5-3 valid values are "utf8", "latin1" and
356 "native" (case-sensitive)
359 public void setStringEncoding(String enc) throws RserveException {
360 if (!connected || rt==null)
361 throw new RserveException(this,"Not connected");
362 RPacket rp = rt.request(RTalk.CMD_setEncoding, enc);
363 if (rp != null && rp.isOk()) return;
364 throw new RserveException(this,"setStringEncoding failed", rp);
367 /** login using supplied user/pwd. Note that login must be the first
370 @param pwd password */
371 public void login(String user, String pwd) throws RserveException {
372 if (!authReq) return;
373 if (!connected || rt==null)
374 throw new RserveException(this,"Not connected");
375 if (authType==AT_crypt) {
376 if (Key==null) Key="rs";
377 RPacket rp=rt.request(RTalk.CMD_login,user+"\n"+jcrypt.crypt(Key,pwd));
378 if (rp!=null && rp.isOk()) return;
379 try { s.close(); } catch(Exception e) {};
380 is=null; os=null; s=null; connected=false;
381 throw new RserveException(this,"login failed",rp);
383 RPacket rp=rt.request(RTalk.CMD_login,user+"\n"+pwd);
384 if (rp!=null && rp.isOk()) return;
385 try {s.close();} catch (Exception e) {};
386 is=null; os=null; s=null; connected=false;
387 throw new RserveException(this,"login failed",rp);
391 /** detaches the session and closes the connection (requires Rserve 0.4+). The session can be only resumed by calling @link{RSession.attach} */
392 public RSession detach() throws RserveException {
393 if (!connected || rt==null)
394 throw new RserveException(this,"Not connected");
395 RPacket rp=rt.request(RTalk.CMD_detachSession);
396 if (rp==null || !rp.isOk())
397 throw new RserveException(this,"Cannot detach",rp);
398 RSession s = new RSession(this, rp);
403 /** check connection state. Note that currently this state is not checked on-the-spot,
404 that is if connection went down by an outside event this is not reflected by
406 @return <code>true</code> if this connection is alive */
407 public boolean isConnected() { return connected; }
409 /** check authentication requirement sent by server
410 @return <code>true</code> is server requires authentication. In such case first
411 command after connecting must be {@link #login}. */
412 public boolean needLogin() { return authReq; }
414 /** get last error string
415 @return last error string */
416 public String getLastError() { return lastError; }
418 /** evaluates the given command in the master server process asynchronously (control command). Note that control commands are always asynchronous, i.e., the expression is enqueued for evaluation in the master process and the method returns before the expression is evaluated (in non-parallel builds the client has to close the connection before the expression can be evaluated). There is no way to check for errors and control commands should be sent with utmost care as they can abort the server process. The evaluation has no immediate effect on the client session.
419 * @param cmd command/expression string
420 * @since Rserve 0.6-0 */
421 public void serverEval(String cmd) throws RserveException {
422 if (!connected || rt == null)
423 throw new RserveException(this, "Not connected");
424 RPacket rp = rt.request(RTalk.CMD_ctrlEval, cmd+"\n");
425 if (rp != null && rp.isOk()) return;
426 throw new RserveException(this,"serverEval failed",rp);
429 /** sources the given file (the path must be local to the server!) in the master server process asynchronously (control command). See {@link #serverEval()} for details on control commands.
430 * @param serverFile path to a file on the server (it is recommended to always use full paths, because the server process has a different working directory than the client child process!).
431 * @since Rserve 0.6-0 */
432 public void serverSource(String serverFile) throws RserveException {
433 if (!connected || rt == null)
434 throw new RserveException(this, "Not connected");
435 RPacket rp = rt.request(RTalk.CMD_ctrlSource, serverFile);
436 if (rp != null && rp.isOk()) return;
437 throw new RserveException(this,"serverSource failed",rp);
440 /** attempt to shut down the server process cleanly. Note that there is a fundamental difference between the {@link shutdown()} method and this method: <code>serverShutdown()</code> is a proper control command and thus fully authentication controllable, whereas {@link shutdown()} is a client-side command sent to the client child process and thus relying on the ability of the client to signal the server process which may be disabled. Therefore <code>serverShutdown()</code> is preferred and more reliable for Rserve 0.6-0 and higher.
441 * @since Rserve 0.6-0 */
442 public void serverShutdown() throws RserveException {
443 if (!connected || rt == null)
444 throw new RserveException(this, "Not connected");
445 RPacket rp = rt.request(RTalk.CMD_ctrlShutdown);
446 if (rp != null && rp.isOk()) return;
447 throw new RserveException(this,"serverShutdown failed",rp);
450 //========= REngine interface API
452 public REXP parse(String text, boolean resolve) throws REngineException {
453 throw new REngineException(this, "Rserve doesn't support separate parsing step.");
455 public REXP eval(REXP what, REXP where, boolean resolve) throws REngineException {
456 return new REXPNull();
458 public REXP parseAndEval(String text, REXP where, boolean resolve) throws REngineException {
459 if (where!=null) throw new REngineException(this, "Rserve doesn't support environments other than .GlobalEnv");
462 } catch (RserveException re) {
463 throw new REngineException(this, re.getMessage());
467 /** assign into an environment
468 @param symbol symbol name
469 @param value value to assign
470 @param env environment to assign to */
471 public void assign(String symbol, REXP value, REXP env) throws REngineException {
472 if (env!=null) throw new REngineException(this, "Rserve doesn't support environments other than .GlobalEnv");
474 assign(symbol, value);
475 } catch (RserveException re) {
476 throw new REngineException(this, re.getMessage());
480 /** get a value from an environment
481 @param symbol symbol name
482 @param env environment
483 @param resolve resolve the resulting REXP or just return a reference
485 public REXP get(String symbol, REXP env, boolean resolve) throws REngineException {
486 if (!resolve) throw new REngineException(this, "Rserve doesn't support references");
488 return eval("get(\""+symbol+"\")");
489 } catch (RserveException re) {
490 throw new REngineException(this, re.getMessage());
494 /** fetch the contents of the given reference. The resulting REXP may never be REXPReference.
495 @param ref reference to resolve
496 @return resolved reference */
497 public REXP resolveReference(REXP ref) throws REngineException {
498 throw new REngineException(this, "Rserve doesn't support references");
501 public REXP createReference(REXP ref) throws REngineException {
502 throw new REngineException(this, "Rserve doesn't support references");
504 public void finalizeReference(REXP ref) throws REngineException {
505 throw new REngineException(this, "Rserve doesn't support references");
508 public REXP getParentEnvironment(REXP env, boolean resolve) throws REngineException {
509 throw new REngineException(this, "Rserve doesn't support environments other than .GlobalEnv");
512 public REXP newEnvironment(REXP parent, boolean resolve) throws REngineException {
513 throw new REngineException(this, "Rserve doesn't support environments other than .GlobalEnv");