]> gerrit.simantics Code Review - simantics/r.git/blob - bundles/org.simantics.r.scl/src/org/rosuda/REngine/Rserve/RConnection.java
(refs #6833) Test RExp inheritance in SCL
[simantics/r.git] / bundles / org.simantics.r.scl / src / org / rosuda / REngine / Rserve / RConnection.java
1 package org.rosuda.REngine.Rserve;
2
3 // JRclient library - client interface to Rserve, see http://www.rosuda.org/Rserve/
4 // Copyright (C) 2004-08 Simon Urbanek
5 // --- for licensing information see LICENSE file in the original JRclient distribution ---
6
7 import java.util.*;
8 import java.io.*;
9 import java.net.*;
10 import org.rosuda.REngine.*;
11 import org.rosuda.REngine.Rserve.protocol.*;
12
13 /**  class providing TCP/IP connection to an Rserve
14      @version $Id$
15 */
16 public class RConnection extends REngine {
17     /** last error string */
18     String lastError=null;
19     Socket s;
20     boolean connected=false;
21     InputStream is;
22     OutputStream os;
23     boolean authReq=false;
24     int authType=AT_plain;
25     String Key=null;
26     RTalk rt=null;
27
28     String host;
29     int port;
30
31     /** This static variable specifies the character set used to encode string for transfer. Under normal circumstances there should be no reason for changing this variable. The default is UTF-8, which makes sure that 7-bit ASCII characters are sent in a backward-compatible fashion. Currently (Rserve 0.1-7) there is no further conversion on Rserve's side, i.e. the strings are passed to R without re-coding. If necessary the setting should be changed <u>before</u> connecting to the Rserve in case later Rserves will provide a possibility of setting the encoding during the handshake. */
32     public static String transferCharset="UTF-8";
33     
34     /** authorization type: plain text */
35     public static final int AT_plain = 0;
36     /** authorization type: unix crypt */
37     public static final int AT_crypt = 1;
38
39     /** version of the server (as reported in IDstring just after Rsrv) */
40     protected int rsrvVersion;
41     
42     /** make a new local connection on default port (6311) */
43     public RConnection() throws RserveException {
44                 this("127.0.0.1",6311);
45     }
46
47     /** make a new connection to specified host on default port (6311)
48         @param host host name/IP
49     */
50     public RConnection(String host) throws RserveException {
51                 this(host,6311);
52     }
53
54     /** make a new connection to specified host and given port.
55          * Make sure you check {@link #isConnected} to ensure the connection was successfully created.
56          * @param host host name/IP
57          * @param port TCP port
58          */
59     public RConnection(String host, int port) throws RserveException {
60                 this(host, port, null);
61     }
62
63         /** restore a connection based on a previously detached session
64          * @param session detached session object */
65     RConnection(RSession session) throws RserveException {
66                 this(null, 0, session);
67     }
68
69     RConnection(String host, int port, RSession session) throws RserveException {
70         try {
71             if (connected) s.close();
72             s=null;
73         } catch (Exception e) {
74             throw new RserveException(this,"Cannot connect: "+e.getMessage());
75         }
76                 if (session!=null) {
77                         host=session.host;
78                         port=session.port;
79                 }
80         connected=false;
81                 this.host=host;
82                 this.port=port;
83         try {
84             s=new Socket(host,port);
85                         // disable Nagle's algorithm since we really want immediate replies
86                         s.setTcpNoDelay(true);
87         } catch (Exception sce) {
88             throw new RserveException(this,"Cannot connect: "+sce.getMessage());
89         }
90         try {
91             is=s.getInputStream();
92             os=s.getOutputStream();
93         } catch (Exception gse) {
94             throw new RserveException(this,"Cannot get io stream: "+gse.getMessage());
95         }
96         rt=new RTalk(is,os);
97                 if (session==null) {
98                         byte[] IDs=new byte[32];
99                         int n=-1;
100                         try {
101                                 n=is.read(IDs);
102                         } catch (Exception sre) {
103                                 throw new RserveException(this,"Error while receiving data: "+sre.getMessage());
104                         }
105                         try {
106                                 if (n!=32) {
107                                         throw new RserveException(this,"Handshake failed: expected 32 bytes header, got "+n);
108                                 }
109                                 String ids=new String(IDs);
110                                 if (ids.substring(0,4).compareTo("Rsrv")!=0)
111                                         throw new RserveException(this,"Handshake failed: Rsrv signature expected, but received \""+ids+"\" instead.");
112                                 try {
113                                         rsrvVersion=Integer.parseInt(ids.substring(4,8));
114                                 } catch (Exception px) {}
115                                 // we support (knowingly) up to 103
116                                 if (rsrvVersion>103)
117                                         throw new RserveException(this,"Handshake failed: The server uses more recent protocol than this client.");
118                                 if (ids.substring(8,12).compareTo("QAP1")!=0)
119                                         throw new RserveException(this,"Handshake failed: unupported transfer protocol ("+ids.substring(8,12)+"), I talk only QAP1.");
120                                 for (int i=12;i<32;i+=4) {
121                                         String attr=ids.substring(i,i+4);
122                                         if (attr.compareTo("ARpt")==0) {
123                                                 if (!authReq) { // this method is only fallback when no other was specified
124                                                         authReq=true;
125                                                         authType=AT_plain;
126                                                 }
127                                         }
128                                         if (attr.compareTo("ARuc")==0) {
129                                                 authReq=true;
130                                                 authType=AT_crypt;
131                                         }
132                                         if (attr.charAt(0)=='K') {
133                                                 Key=attr.substring(1,3);
134                                         }
135                                 }
136                         } catch (RserveException innerX) {
137                                 try { s.close(); } catch (Exception ex01) {}; is=null; os=null; s=null;
138                                 throw innerX;
139                         }
140                 } else { // we have a session to take care of
141                         try {
142                                 os.write(session.key,0,32);
143                         } catch (Exception sre) {
144                                 throw new RserveException(this,"Error while sending session key: "+sre.getMessage());
145                         }
146                         rsrvVersion = session.rsrvVersion;
147                 }
148                 connected=true;
149                 lastError="OK";
150     }    
151         
152     public void finalize() {
153         close();
154         is=null;
155         os=null;
156     }
157
158     /** get server version as reported during the handshake.
159         @return server version as integer (Rsrv0100 will return 100) */
160     public int getServerVersion() {
161         return rsrvVersion;
162     }
163     
164     /** closes current connection */
165     public boolean close() {
166         try {
167             if (s != null) s.close();
168             connected = false;
169                         return true;
170         } catch(Exception e) { };
171                 return false;
172     }
173     
174     public String voidEvalS(String cmd) {
175         try {
176                 voidEval(cmd);
177                 return "";
178         } catch (RserveException e) {
179                 return e.getMessage();
180         }
181     }
182     
183     /** evaluates the given command, but does not fetch the result (useful for assignment
184         operations)
185         @param cmd command/expression string */
186     public void voidEval(String cmd) throws RserveException {
187                 if (!connected || rt==null)
188                         throw new RserveException(this,"Not connected");
189                 RPacket rp=rt.request(RTalk.CMD_voidEval,cmd+"\n");
190                 if (rp!=null && rp.isOk()) return;
191         throw new RserveException(this,"voidEval failed",rp);
192     }
193
194         /** evaluates the given command, detaches the session (see @link{detach()}) and closes connection while the command is being evaluted (requires Rserve 0.4+).
195                 Note that a session cannot be attached again until the commad was successfully processed. Techincally the session is put into listening mode while the command is being evaluated but accept is called only after the command was evaluated. One commonly used techique to monitor detached working sessions is to use second connection to poll the status (e.g. create a temporary file and return the full path before detaching thus allowing new connections to read it).
196                 @param cmd command/expression string
197                 @return session object that can be use to attach back to the session once the command completed */
198     public RSession voidEvalDetach(String cmd) throws RserveException {
199                 if (!connected || rt==null)
200                         throw new RserveException(this,"Not connected");
201                 RPacket rp=rt.request(RTalk.CMD_detachedVoidEval,cmd+"\n");
202                 if (rp==null || !rp.isOk())
203                         throw new RserveException(this,"detached void eval failed",rp);
204                 RSession s = new RSession(this, rp);
205                 close();
206                 return s;
207     }
208         
209     REXP parseEvalResponse(RPacket rp) throws RserveException {
210                 int rxo=0;
211                 byte[] pc=rp.getCont();
212                 if (rsrvVersion>100) { /* since 0101 eval responds correctly by using DT_SEXP type/len header which is 4 bytes long */
213                         rxo=4;
214                         /* we should check parameter type (should be DT_SEXP) and fail if it's not */
215                         if (pc[0]!=RTalk.DT_SEXP && pc[0]!=(RTalk.DT_SEXP|RTalk.DT_LARGE))
216                                 throw new RserveException(this,"Error while processing eval output: SEXP (type "+RTalk.DT_SEXP+") expected but found result type "+pc[0]+".");
217                         if (pc[0]==(RTalk.DT_SEXP|RTalk.DT_LARGE))
218                                 rxo=8; // large data need skip of 8 bytes
219                         /* warning: we are not checking or using the length - we assume that only the one SEXP is returned. This is true for the current CMD_eval implementation, but may not be in the future. */
220                 }
221                 if (pc.length>rxo) {
222                         try {
223                                 REXPFactory rx=new REXPFactory();
224                                 rx.parseREXP(pc, rxo);
225                                 return rx.getREXP();
226                         } catch (REXPMismatchException me) {
227                                 me.printStackTrace();
228                                 throw new RserveException(this, "Error when parsing response: "+me.getMessage());
229                         }
230                 }
231                 return null;
232     }
233
234     /** evaluates the given command and retrieves the result
235         @param cmd command/expression string
236         @return R-xpression or <code>null</code> if an error occured */
237     public REXP eval(String cmd) throws RserveException {
238                 if (!connected || rt==null)
239             throw new RserveException(this,"Not connected");
240                 RPacket rp=rt.request(RTalk.CMD_eval,cmd+"\n");
241                 if (rp!=null && rp.isOk())
242                         return parseEvalResponse(rp);
243         throw new RserveException(this,"eval failed",rp);
244     }
245
246     /** assign a string value to a symbol in R. The symbol is created if it doesn't exist already.
247         @param sym symbol name. Currently assign uses CMD_setSEXP command of Rserve, i.e. the symbol value is NOT parsed. It is the responsibility of the user to make sure that the symbol name is valid in R (recall the difference between a symbol and an expression!). In fact R will always create the symbol, but it may not be accessible (examples: "bar\nfoo" or "bar$foo").
248         @param ct contents
249         */
250     public void assign(String sym, String ct) throws RserveException {
251                 if (!connected || rt==null)
252             throw new RserveException(this,"Not connected");
253         byte[] symn=sym.getBytes();
254         byte[] ctn=ct.getBytes();
255         int sl=symn.length+1;
256         int cl=ctn.length+1;
257         if ((sl&3)>0) sl=(sl&0xfffffc)+4; // make sure the symbol length is divisible by 4
258         if ((cl&3)>0) cl=(cl&0xfffffc)+4; // make sure the content length is divisible by 4
259         byte[] rq=new byte[sl+4+cl+4];
260         int ic;
261         for(ic=0;ic<symn.length;ic++) rq[ic+4]=symn[ic];
262         while (ic<sl) { rq[ic+4]=0; ic++; }
263         for(ic=0;ic<ctn.length;ic++) rq[ic+sl+8]=ctn[ic];
264         while (ic<cl) { rq[ic+sl+8]=0; ic++; }
265                 RTalk.setHdr(RTalk.DT_STRING,sl,rq,0);
266                 RTalk.setHdr(RTalk.DT_STRING,cl,rq,sl+4);
267                 RPacket rp=rt.request(RTalk.CMD_setSEXP,rq);
268         if (rp!=null && rp.isOk()) return;
269         throw new RserveException(this,"assign failed",rp);
270     }
271
272     /** assign a content of a REXP to a symbol in R. The symbol is created if it doesn't exist already.
273      * @param sym symbol name. Currently assign uses CMD_setSEXP command of Rserve, i.e. the symbol value is NOT parsed. It is the responsibility of the user to make sure that the symbol name is valid in R (recall the difference between a symbol and an expression!). In fact R will always create the symbol, but it may not be accessible (examples: "bar\nfoo" or "bar$foo").
274          * @param rexp contents
275          */
276 public void assign(String sym, REXP rexp) throws RserveException {
277         if (!connected || rt==null)
278             throw new RserveException(this,"Not connected");
279         try {
280                 REXPFactory r = new REXPFactory(rexp);
281                 int rl=r.getBinaryLength();
282                 byte[] symn=sym.getBytes();
283                 int sl=symn.length+1;
284                 if ((sl&3)>0) sl=(sl&0xfffffc)+4; // make sure the symbol length is divisible by 4
285                 byte[] rq=new byte[sl+rl+((rl>0xfffff0)?12:8)];
286                 int ic;
287                 for(ic=0;ic<symn.length;ic++) rq[ic+4]=symn[ic];
288                 while(ic<sl) { rq[ic+4]=0; ic++; }; // pad with 0
289                 RTalk.setHdr(RTalk.DT_STRING,sl,rq,0);
290                 RTalk.setHdr(RTalk.DT_SEXP,rl,rq,sl+4);
291                 r.getBinaryRepresentation(rq,sl+((rl>0xfffff0)?12:8));
292                 RPacket rp=rt.request(RTalk.CMD_setSEXP,rq);
293                 if (rp!=null && rp.isOk()) return;
294                 throw new RserveException(this,"assign failed",rp);
295         } catch (REXPMismatchException me) {
296                 throw new RserveException(this, "Error creating binary representation: "+me.getMessage());
297         }
298 }
299
300     /** open a file on the Rserve for reading
301         @param fn file name. should not contain any path delimiters, since Rserve may restrict the access to local working directory.
302         @return input stream to be used for reading. Note that the stream is read-once only, there is no support for seek or rewind. */
303     public RFileInputStream openFile(String fn) throws IOException {
304                 return new RFileInputStream(rt,fn);
305     }
306
307     /** create a file on the Rserve for writing
308         @param fn file name. should not contain any path delimiters, since Rserve may restrict the access to local working directory.
309         @return output stream to be used for writinging. Note that the stream is write-once only, there is no support for seek or rewind. */
310     public RFileOutputStream createFile(String fn) throws IOException {
311         return new RFileOutputStream(rt,fn);
312     }
313
314     /** remove a file on the Rserve
315         @param fn file name. should not contain any path delimiters, since Rserve may restrict the access to local working directory. */
316     public void removeFile(String fn) throws RserveException {
317                 if (!connected || rt==null)
318                         throw new RserveException(this,"Not connected");
319                 RPacket rp=rt.request(RTalk.CMD_removeFile,fn);
320                 if (rp!=null && rp.isOk()) return;
321         throw new RserveException(this,"removeFile failed",rp);
322     }
323
324     /** shutdown remote Rserve. Note that some Rserves cannot be shut down from the client side. */
325     public void shutdown() throws RserveException {
326                 if (!connected || rt==null)
327                         throw new RserveException(this,"Not connected");
328
329                 RPacket rp=rt.request(RTalk.CMD_shutdown);
330                 if (rp!=null && rp.isOk()) return;
331         throw new RserveException(this,"shutdown failed",rp);
332     }
333
334     /** Sets send buffer size of the Rserve (in bytes) for the current connection. All responses send by Rserve are stored in the send buffer before transmitting. This means that any objects you want to get from the Rserve need to fit into that buffer. By default the size of the send buffer is 2MB. If you need to receive larger objects from Rserve, you will need to use this function to enlarge the buffer. In order to save memory, you can also reduce the buffer size once it's not used anymore. Currently the buffer size is only limited by the memory available and/or 1GB (whichever is smaller). Current Rserve implementations won't go below buffer sizes of 32kb though. If the specified buffer size results in 'out of memory' on the server, the corresponding error is sent and the connection is terminated.<br>
335         <i>Note:</i> This command may go away in future versions of Rserve which will use dynamic send buffer allocation.
336         @param sbs send buffer size (in bytes) min=32k, max=1GB
337      */
338     public void setSendBufferSize(long sbs) throws RserveException {
339         if (!connected || rt==null)
340                         throw new RserveException(this,"Not connected");
341
342         RPacket rp=rt.request(RTalk.CMD_setBufferSize,(int)sbs);
343         if (rp!=null && rp.isOk()) return;
344         throw new RserveException(this,"setSendBufferSize failed",rp);        
345     }
346
347     /** set string encoding for this session. It is strongly
348      * recommended to make sure the encoding is always set to UTF-8
349      * because that is the only encoding supported by this Java
350      * client. It can be done either by uisng the
351      * <code>encoding</code> option in the server or by calling
352      * setStringEncoding("utf8") at the beginning of a session (but
353      * after login).
354      @param enc name of the encoding as defined by Rserve - as of
355      Rserve version 0.5-3 valid values are "utf8", "latin1" and
356      "native" (case-sensitive)
357      @since Rserve 0.5-3
358     */
359     public void setStringEncoding(String enc) throws RserveException {
360         if (!connected || rt==null)
361                         throw new RserveException(this,"Not connected");
362         RPacket rp = rt.request(RTalk.CMD_setEncoding, enc);
363         if (rp != null && rp.isOk()) return;
364         throw new RserveException(this,"setStringEncoding failed", rp);
365     }
366
367     /** login using supplied user/pwd. Note that login must be the first
368         command if used
369         @param user username
370         @param pwd password */
371     public void login(String user, String pwd) throws RserveException {
372                 if (!authReq) return;
373                 if (!connected || rt==null)
374                         throw new RserveException(this,"Not connected");
375                 if (authType==AT_crypt) {
376                         if (Key==null) Key="rs";
377                         RPacket rp=rt.request(RTalk.CMD_login,user+"\n"+jcrypt.crypt(Key,pwd));
378                         if (rp!=null && rp.isOk()) return;
379                         try { s.close(); } catch(Exception e) {};
380                         is=null; os=null; s=null; connected=false;
381             throw new RserveException(this,"login failed",rp);
382                 }
383                 RPacket rp=rt.request(RTalk.CMD_login,user+"\n"+pwd);
384                 if (rp!=null && rp.isOk()) return;
385                 try {s.close();} catch (Exception e) {};
386                 is=null; os=null; s=null; connected=false;
387         throw new RserveException(this,"login failed",rp);
388     }
389
390     
391     /** detaches the session and closes the connection (requires Rserve 0.4+). The session can be only resumed by calling @link{RSession.attach} */
392         public RSession detach() throws RserveException {
393                 if (!connected || rt==null)
394             throw new RserveException(this,"Not connected");
395                 RPacket rp=rt.request(RTalk.CMD_detachSession);
396                 if (rp==null || !rp.isOk())
397                         throw new RserveException(this,"Cannot detach",rp);
398                 RSession s = new RSession(this, rp);
399                 close();
400                 return s;
401     }
402
403     /** check connection state. Note that currently this state is not checked on-the-spot,
404         that is if connection went down by an outside event this is not reflected by
405         the flag
406         @return <code>true</code> if this connection is alive */
407     public boolean isConnected() { return connected; }
408     
409     /** check authentication requirement sent by server
410         @return <code>true</code> is server requires authentication. In such case first
411         command after connecting must be {@link #login}. */
412     public boolean needLogin() { return authReq; }
413     
414     /** get last error string
415         @return last error string */
416     public String getLastError() { return lastError; }
417         
418     /** evaluates the given command in the master server process asynchronously (control command). Note that control commands are always asynchronous, i.e., the expression is enqueued for evaluation in the master process and the method returns before the expression is evaluated (in non-parallel builds the client has to close the connection before the expression can be evaluated). There is no way to check for errors and control commands should be sent with utmost care as they can abort the server process. The evaluation has no immediate effect on the client session.
419      *  @param cmd command/expression string 
420      *  @since Rserve 0.6-0 */
421     public void serverEval(String cmd) throws RserveException {
422         if (!connected || rt == null)
423             throw new RserveException(this, "Not connected");
424         RPacket rp = rt.request(RTalk.CMD_ctrlEval, cmd+"\n");
425         if (rp != null && rp.isOk()) return;
426         throw new RserveException(this,"serverEval failed",rp);
427     }
428     
429     /** sources the given file (the path must be local to the server!) in the master server process asynchronously (control command). See {@link #serverEval()} for details on control commands.
430      *  @param serverFile path to a file on the server (it is recommended to always use full paths, because the server process has a different working directory than the client child process!).
431      *  @since Rserve 0.6-0 */
432     public void serverSource(String serverFile) throws RserveException {
433         if (!connected || rt == null)
434             throw new RserveException(this, "Not connected");
435         RPacket rp = rt.request(RTalk.CMD_ctrlSource, serverFile);
436         if (rp != null && rp.isOk()) return;
437         throw new RserveException(this,"serverSource failed",rp);
438     }
439     
440     /** attempt to shut down the server process cleanly. Note that there is a fundamental difference between the {@link shutdown()} method and this method: <code>serverShutdown()</code> is a proper control command and thus fully authentication controllable, whereas {@link shutdown()} is a client-side command sent to the client child process and thus relying on the ability of the client to signal the server process which may be disabled. Therefore <code>serverShutdown()</code> is preferred and more reliable for Rserve 0.6-0 and higher.
441      *  @since Rserve 0.6-0 */
442     public void serverShutdown() throws RserveException {
443         if (!connected || rt == null)
444             throw new RserveException(this, "Not connected");
445         RPacket rp = rt.request(RTalk.CMD_ctrlShutdown);
446         if (rp != null && rp.isOk()) return;
447         throw new RserveException(this,"serverShutdown failed",rp);
448     }
449     
450 //========= REngine interface API
451
452 public REXP parse(String text, boolean resolve) throws REngineException {
453         throw new REngineException(this, "Rserve doesn't support separate parsing step.");
454 }
455 public REXP eval(REXP what, REXP where, boolean resolve) throws REngineException {
456         return new REXPNull();
457 }
458 public REXP parseAndEval(String text, REXP where, boolean resolve) throws REngineException {
459         if (where!=null) throw new REngineException(this, "Rserve doesn't support environments other than .GlobalEnv");
460         try {
461                 return eval(text);
462         } catch (RserveException re) {
463                 throw new REngineException(this, re.getMessage());
464         }
465 }
466
467 /** assign into an environment
468 @param symbol symbol name
469 @param value value to assign
470 @param env environment to assign to */
471 public void assign(String symbol, REXP value, REXP env) throws REngineException {
472         if (env!=null) throw new REngineException(this, "Rserve doesn't support environments other than .GlobalEnv");
473         try {
474                 assign(symbol, value);
475         } catch (RserveException re) {
476                 throw new REngineException(this, re.getMessage());
477         }
478 }
479
480 /** get a value from an environment
481 @param symbol symbol name
482 @param env environment
483 @param resolve resolve the resulting REXP or just return a reference            
484 @return value */
485 public REXP get(String symbol, REXP env, boolean resolve) throws REngineException {
486         if (!resolve) throw new REngineException(this, "Rserve doesn't support references");
487         try {
488                 return eval("get(\""+symbol+"\")");
489         } catch (RserveException re) {
490                 throw new REngineException(this, re.getMessage());
491         }
492 }
493
494 /** fetch the contents of the given reference. The resulting REXP may never be REXPReference.
495 @param ref reference to resolve
496 @return resolved reference */
497 public REXP resolveReference(REXP ref) throws REngineException {
498         throw new REngineException(this, "Rserve doesn't support references");
499 }
500         
501         public REXP createReference(REXP ref) throws REngineException {
502                 throw new REngineException(this, "Rserve doesn't support references");
503         }
504         public void finalizeReference(REXP ref) throws REngineException {
505                 throw new REngineException(this, "Rserve doesn't support references");
506         }
507         
508 public REXP getParentEnvironment(REXP env, boolean resolve) throws REngineException {
509         throw new REngineException(this, "Rserve doesn't support environments other than .GlobalEnv");
510 }
511
512 public REXP newEnvironment(REXP parent, boolean resolve) throws REngineException {
513         throw new REngineException(this, "Rserve doesn't support environments other than .GlobalEnv");
514 }
515
516 }
517