File Coverage

blib/lib/Statistics/R/IO/Rserve.pm
Criterion Covered Total %
statement 88 113 77.8
branch 27 54 50.0
condition 9 23 39.1
subroutine 20 23 86.9
pod 4 7 57.1
total 148 220 67.2


line stmt bran cond sub pod time code
1             package Statistics::R::IO::Rserve;
2             # ABSTRACT: Supply object methods for Rserve communication
3             $Statistics::R::IO::Rserve::VERSION = '1.0002';
4 9     9   321752 use 5.010;
  9         48  
5              
6 9     9   1847 use Class::Tiny::Antlers;
  9         19808  
  9         62  
7              
8 9     9   3161 use Statistics::R::IO::REXPFactory;
  9         24  
  9         347  
9 9     9   3003 use Statistics::R::IO::QapEncoding;
  9         25  
  9         406  
10              
11 9     9   1973 use Socket;
  9         14121  
  9         3799  
12 9     9   1461 use IO::Socket::INET ();
  9         62439  
  9         201  
13 9     9   57 use Scalar::Util qw(blessed looks_like_number openhandle);
  9         19  
  9         427  
14 9     9   47 use Carp;
  9         30  
  9         353  
15              
16 9     9   59 use namespace::clean;
  9         19  
  9         61  
17              
18              
19             has fh => (
20             is => 'ro',
21             default => sub {
22             my $self = shift;
23             my $fh;
24             if ($self->_usesocket) {
25             socket($fh, PF_INET, SOCK_STREAM, getprotobyname('tcp')) ||
26             croak "socket: $!";
27             connect($fh, sockaddr_in($self->port, inet_aton($self->server))) ||
28             croak "connect: $!";
29             bless $fh, 'IO::Handle'
30             }
31             else {
32             $fh = IO::Socket::INET->new(PeerAddr => $self->server,
33             PeerPort => $self->port) or
34             croak $!
35             }
36             $self->_set_autoclose(1) unless defined($self->_autoclose);
37             my ($response, $rc) = '';
38             while ($rc = $fh->read($response, 32 - length $response,
39             length $response)) {}
40             croak $! unless defined $rc;
41              
42             croak "Unrecognized server ID" unless
43             substr($response, 0, 12) eq 'Rsrv0103QAP1';
44             $fh
45             },
46             );
47              
48             has server => (
49             is => 'ro',
50             default => 'localhost',
51             );
52              
53             has port => (
54             is => 'ro',
55             default => 6311,
56             );
57              
58              
59             has _autoclose => (
60             is => 'ro',
61             );
62              
63              
64             has _autoflush => (
65             is => 'ro',
66             default => sub {
67             my $self = shift;
68             $self->_usesocket ? 1 : 0
69             },
70             );
71              
72             has _usesocket => (
73             is => 'ro',
74             default => 0
75             );
76              
77              
78             use constant {
79 9         8576 CMD_login => 0x001, # "name\npwd" : -
80             CMD_voidEval => 0x002, # string : -
81             CMD_eval => 0x003, # string | encoded SEXP : encoded SEXP
82             CMD_shutdown => 0x004, # [admin-pwd] : -
83              
84             # security/encryption - all since 1.7-0
85             CMD_switch => 0x005, # string (protocol) : -
86             CMD_keyReq => 0x006, # string (request) : bytestream (key)
87             CMD_secLogin => 0x007, # bytestream (encrypted auth) : -
88             CMD_OCcall => 0x00f, # SEXP : SEXP -- it is the only command
89             # supported in object-capability mode and it
90             # requires that the SEXP is a language
91             # construct with OC reference in the first
92             # position
93             CMD_OCinit => 0x434f7352, # SEXP -- 'RsOC' - command sent from the
94             # server in OC mode with the packet of
95             # initial capabilities. file I/O
96             # routines. server may answe
97             CMD_openFile => 0x010, # fn : -
98             CMD_createFile => 0x011, # fn : -
99             CMD_closeFile => 0x012, # - : -
100             CMD_readFile => 0x013, # [int size] : data... ; if size not
101             # present, server is free to choose any
102             # value - usually it uses the size of its
103             # static buffer
104             CMD_writeFile => 0x014, # data : -
105             CMD_removeFile => 0x015, # fn : -
106              
107             # object manipulation
108             CMD_setSEXP => 0x020, # string(name), REXP : -
109             CMD_assignSEXP => 0x021, # string(name), REXP : - ; same as
110             # setSEXP except that the name is parsed
111              
112             # session management (since 0.4-0)
113             CMD_detachSession => 0x030, # : session key
114             CMD_detachedVoidEval => 0x031, # string : session key; doesn't
115             CMD_attachSession => 0x032, # session key : -
116              
117             # control commands (since 0.6-0) - passed on to the master process */
118             # Note: currently all control commands are asychronous, i.e. RESP_OK
119             # indicates that the command was enqueued in the master pipe, but there
120             # is no guarantee that it will be processed. Moreover non-forked
121             # connections (e.g. the default debug setup) don't process any
122             # control commands until the current client connection is closed so
123             # the connection issuing the control command will never see its
124             # result.
125             CMD_ctrl => 0x40, # -- not a command - just a constant --
126             CMD_ctrlEval => 0x42, # string : -
127             CMD_ctrlSource => 0x45, # string : -
128             CMD_ctrlShutdown => 0x44, # - : -
129              
130             # 'internal' commands (since 0.1-9)
131             CMD_setBufferSize => 0x081, # [int sendBufSize] this commad allow
132             # clients to request bigger buffer
133             # sizes if large data is to be
134             # transported from Rserve to the
135             # client. (incoming buffer is resized
136             # automatically)
137             CMD_setEncoding => 0x082, # string (one of "native","latin1","utf8") : -; since 0.5-3
138              
139             # special commands - the payload of packages with this mask does not contain defined parameters
140             CMD_SPECIAL_MASK => 0xf0,
141             CMD_serEval => 0xf5, # serialized eval - the packets are raw
142             # serialized data without data header
143             CMD_serAssign => 0xf6, # serialized assign - serialized list with
144             # [[1]]=name, [[2]]=value
145             CMD_serEEval => 0xf7, # serialized expression eval - like serEval
146             # with one additional evaluation round
147 9     9   19498 };
  9         20  
148              
149              
150             sub BUILDARGS {
151 155     155 0 242813 my $class = shift;
152            
153 155 50       714 if ( scalar @_ == 0 ) {
    100          
    50          
154             return { }
155 0         0 } elsif ( scalar @_ == 1 ) {
156 51 50       176 if ( ref $_[0] eq 'HASH' ) {
    100          
157 0         0 my $args = { %{ $_[0] } };
  0         0  
158 0 0       0 if (my $fh = $args->{fh}) {
159 0         0 ($args->{server}, $args->{port}) = _fh_host_port($fh);
160             }
161 0         0 return $args
162             } elsif (ref $_[0] eq '') {
163 1         3 my $server = shift;
164 1         3 return { server => $server }
165             } else {
166 50         75 my $fh = shift;
167 50         122 my ($server, $port) = _fh_host_port($fh);
168 50         6038 return { fh => $fh,
169             server => $server,
170             port => $port,
171             _autoclose => 0,
172             _autoflush => ref($fh) eq 'GLOB' }
173             }
174             }
175             elsif ( @_ % 2 ) {
176 0         0 die "The new() method for $class expects a hash reference or a key/value list."
177             . " You passed an odd number of arguments\n";
178             }
179             else {
180 104         318 my $args = { @_ };
181 104 100       774 if (my $fh = $args->{fh}) {
182 102         301 ($args->{server}, $args->{port}) = _fh_host_port($fh);
183             }
184 104         12469 return $args
185             }
186             }
187              
188              
189             sub BUILD {
190 155     155 0 3807 my ($self, $args) = @_;
191              
192             # Required attribute types
193             die "Attribute 'fh' must be an instance of IO::Handle or an open filehandle" if
194             defined($args->{fh}) &&
195             !((ref($args->{fh}) eq "GLOB" && Scalar::Util::openhandle($args->{fh})) ||
196 155 50 33     1282 (blessed($args->{fh}) && $args->{fh}->isa("IO::Handle")));
      66        
197             die "Attribute 'server' must be scalar value" if
198 155 100 66     2721 exists($args->{server}) && (!defined($args->{server}) || ref($args->{server}));
      33        
199 152 50 33     3733 die "Attribute 'port' must be an integer" unless
200             looks_like_number($self->port) && (int($self->port) == $self->port);
201             }
202              
203              
204             ## Extracts host address and port from the given socket handle (either
205             ## as an object or a "classic" socket)
206             sub _fh_host_port {
207 152 50   152   393 my $fh = shift or return;
208 152 50 33     994 if (ref($fh) eq 'GLOB') {
    50          
209 0 0       0 my ($port, $host) = unpack_sockaddr_in(getpeername($fh)) or return;
210 0         0 my $name = gethostbyaddr($host, AF_INET);
211 0   0     0 return ($name // inet_ntoa($host), $port)
212             } elsif (blessed($fh) && $fh->isa('IO::Socket')){
213 152         2582 return ($fh->peerhost, $fh->peerport)
214             }
215             return undef
216 0         0 }
217              
218              
219             ## Private setter for autoclose used in the default handler of 'fh'
220             sub _set_autoclose {
221 0     0   0 my $self = shift;
222             $self->{_autoclose} = shift
223 0         0 }
224              
225              
226             sub eval {
227 101     101 1 2024 my ($self, $expr) = (shift, shift);
228              
229             # Encode $expr as DT_STRING
230 101         438 my $parameter = pack('VZ*',
231             ((length($expr)+1) << 8) + 4,
232             $expr);
233              
234 101         292 my $data = $self->_send_command(CMD_eval, $parameter);
235              
236 99         178 my ($value, $state) = @{Statistics::R::IO::QapEncoding::decode($data)};
  99         321  
237 99 50       251 croak 'Could not parse Rserve value' unless $state;
238 99 50       249 croak 'Unread data remaining in the Rserve response' unless $state->eof;
239 99         704 $value
240             }
241              
242              
243             sub ser_eval {
244 51     51 1 1884 my ($self, $rexp) = (shift, shift);
245            
246             ## simulate the request parameter as constructed by:
247             ## > serialize(quote(parse(text="{$rexp}")[[1]]), NULL)
248 51         261 my $parameter =
249             "\x58\x0a\0\0\0\2\0\3\0\3\0\2\3\0\0\0\0\6\0\0\0\1\0\4\0" .
250             "\x09\0\0\0\2\x5b\x5b\0\0\0\2\0\0\0\6\0\0\0\1\0\4\0\x09\0\0" .
251             "\0\5\x70\x61\x72\x73\x65\0\0\4\2\0\0\0\1\0\4\0\x09\0\0\0\4\x74\x65" .
252             "\x78\x74\0\0\0\x10\0\0\0\1\0\4\0\x09" .
253             pack('N', length($rexp)+2) .
254             "\x7b" . $rexp . "\x7d" .
255             "\0\0\0\xfe\0\0\0\2\0\0\0\x0e\0\0\0\1\x3f\xf0\0\0\0\0\0\0" .
256             "\0\0\0\xfe";
257             ## request is:
258             ## - command (0xf5, CMD_serEval,
259             ## means raw serialized data without data header)
260 51         132 my $data = $self->_send_command(CMD_serEval, $parameter);
261            
262 50         77 my ($value, $state) = @{Statistics::R::IO::REXPFactory::unserialize($data)};
  50         152  
263 50 50       124 croak 'Could not parse Rserve value' unless $state;
264 50 50       120 croak 'Unread data remaining in the Rserve response' unless $state->eof;
265 50         357 $value
266             }
267              
268              
269             sub get_file {
270 0     0 1 0 my ($self, $remote, $local) = (shift, shift, shift);
271              
272 0         0 my $data = pack 'C*', @{$self->eval("readBin('$remote', what='raw', n=file.info('$remote')[['size']])")->to_pl};
  0         0  
273              
274 0 0       0 if ($local) {
275 0 0       0 open my $local_file, '>:raw', $local or
276             croak "Cannot open $!";
277            
278 0         0 print $local_file $data;
279            
280 0         0 close $local_file;
281             }
282            
283             $data
284 0         0 }
285              
286              
287             use constant {
288 9         701 CMD_RESP => 0x10000, # all responses have this flag set
289             CMD_OOB => 0x20000, # out-of-band data - i.e. unsolicited messages
290 9     9   67 };
  9         19  
291              
292             use constant {
293 9         2861 RESP_OK => (CMD_RESP|0x0001), # command succeeded; returned
294             # parameters depend on the command
295             # issued
296             RESP_ERR => (CMD_RESP|0x0002), # command failed, check stats code
297             # attached string may describe the
298             # error
299             OOB_SEND => (CMD_OOB | 0x1000), # OOB send - unsolicited SEXP sent
300             # from the R instance to the
301             # client. 12 LSB are reserved for
302             # application-specific code
303             OOB_MSG => (CMD_OOB | 0x2000), # OOB message - unsolicited message
304             # sent from the R instance to the
305             # client requiring a response. 12
306             # LSB are reserved for
307             # application-specific code
308 9     9   52 };
  9         17  
309              
310              
311             ## Sends a request to Rserve and receives the response, checking for
312             ## any errors.
313             ##
314             ## Returns the data portion of the server response
315             sub _send_command {
316 152   50 152   440 my ($self, $command, $parameters) = (shift, shift, shift || '');
317            
318             ## request is (byte order is low-endian):
319             ## - command (4 bytes)
320             ## - length of the message (low 32 bits)
321             ## - offset of the data part (normally 0)
322             ## - high 32 bits of the length of the message (0 if < 4GB)
323 152         2616 $self->fh->print(pack('V4', $command, length($parameters), 0, 0) .
324             $parameters);
325 152 50       13003 $self->fh->flush if $self->_autoflush;
326            
327 152         2779 my $response = $self->_receive_response(16);
328             ## Of the next four long-ints:
329             ## - the first one is status and should be 65537 (bytes \1, \0, \1, \0)
330             ## - the second one is length
331             ## - the third and fourth are ??
332 152         612 my ($status, $length) = unpack VV => substr($response, 0, 8);
333 152 50       412 if ($status & CMD_RESP) {
    0          
334 152 100       363 unless ($status == RESP_OK) {
335 3         448 croak 'R server returned an error: ' . sprintf("0x%X", $status)
336             }
337             }
338             elsif ($status & CMD_OOB) {
339 0         0 croak 'OOB messages are not supported yet'
340             }
341             else {
342 0         0 croak 'Unrecognized response type: ' . $status
343             }
344            
345 149         329 $self->_receive_response($length)
346             }
347              
348              
349             sub _receive_response {
350 301     301   600 my ($self, $length) = (shift, shift);
351            
352 301         559 my ($response, $offset, $rc) = ('', 0);
353 301         4804 while ($rc = $self->fh->read($response, $length - $offset, $offset)) {
354 298         20878 $offset += $rc;
355 298 50       712 last if $length == $offset;
356             }
357 301 50       813 croak $! unless defined $rc;
358 301         652 $response
359             }
360              
361              
362             sub close {
363 0     0 1 0 my $self = shift;
364 0         0 $self->fh->close
365             }
366              
367              
368             sub DEMOLISH {
369 155     155 0 596303 my $self = shift;
370 155 50       3267 $self->close if $self->_autoclose
371             }
372              
373              
374             1;
375              
376             __END__