File Coverage

blib/lib/Perinci/Access/Simple/Client.pm
Criterion Covered Total %
statement 102 192 53.1
branch 40 86 46.5
condition 14 23 60.8
subroutine 17 23 73.9
pod 6 6 100.0
total 179 330 54.2


line stmt bran cond sub pod time code
1             package Perinci::Access::Simple::Client;
2              
3             our $DATE = '2016-10-07'; # DATE
4             our $VERSION = '0.22'; # VERSION
5              
6 1     1   19082 use 5.010001;
  1         3  
7 1     1   3 use strict;
  1         1  
  1         15  
8 1     1   2 use warnings;
  1         2  
  1         20  
9 1     1   370 use Log::Any '$log';
  1         8710  
  1         4  
10              
11 1     1   2811 use Cwd qw(abs_path);
  1         1  
  1         36  
12 1     1   379 use Perinci::AccessUtil qw(strip_riap_stuffs_from_res);
  1         1351  
  1         50  
13 1     1   426 use POSIX qw(:sys_wait_h);
  1         4184  
  1         4  
14 1     1   1353 use Tie::Cache;
  1         1798  
  1         27  
15 1     1   345 use URI::Split qw(uri_split);
  1         1697  
  1         51  
16 1     1   13 use URI::Escape;
  1         1  
  1         41  
17              
18 1     1   3 use parent qw(Perinci::Access::Base);
  1         2  
  1         5  
19              
20             my @logging_methods = Log::Any->logging_methods();
21              
22             sub new {
23 2     2 1 2733 my $class = shift;
24              
25 2         15 my $self = $class->SUPER::new(@_);
26              
27             # attributes
28 2   50     33 $self->{retries} //= 2;
29 2   50     10 $self->{retry_delay} //= 3;
30 2   50     15 $self->{conn_cache_size} //= 32;
31              
32             # connection cache, key="tcp:HOST:PORT" OR "unix:ABSPATH" or "pipe:ABSPATH
33             # ARGS". value=hash, for tcp & unix {socket=>...} and for pipe {pid=>...,
34             # chld_out=>..., chld_in=>...}
35 2         17 tie my(%conns), 'Tie::Cache', $self->{conn_cache_size};
36 2         69 $self->{_conns} = \%conns;
37              
38 2         4 $self;
39             }
40              
41             # for older Perinci::Access::Base 0.28-, to remove later
42       0     sub _init {}
43              
44             sub _delete_cache {
45 0     0   0 my ($self, $wanted) = @_;
46 0         0 my $conns = $self->{_conns};
47 0 0       0 return unless $conns;
48              
49 0 0       0 for my $k ($wanted ? ($wanted) : (keys %$conns)) {
50 0 0       0 if ($k =~ /^pipe:/) {
51 0         0 waitpid($conns->{$k}{pid}, WNOHANG);
52             }
53 0         0 delete $self->{_conns}{$k};
54             }
55             }
56              
57             sub DESTROY {
58 1     1   650 my ($self) = @_;
59              
60             #$self->_delete_cache;
61             }
62              
63             sub request {
64 0     0 1 0 my $self = shift;
65 0         0 $self->_parse_or_request('request', @_);
66             }
67              
68             sub _parse {
69 12     12   18761 my $self = shift;
70 12         28 $self->_parse_or_request('parse2', @_);
71             }
72              
73             # which: parse0 = quick parse (for parse_url(), parse2 = more thorough parse
74             # (for testing)
75             sub _parse_or_request {
76 15     15   24 my ($self, $which, $action, $server_url, $extra) = @_;
77 15         45 $log->tracef("=> %s\::request(action=%s, server_url=%s, extra=%s)",
78             __PACKAGE__, $action, $server_url, $extra);
79 15 50       140 return [400, "Please specify server_url"] unless $server_url;
80              
81 15         14 my ($uri,
82             $cache_key,
83             $host, $port, # tcp
84             $path, # unix & pipe
85             $args # pipe
86             );
87 15         30 my ($srvsch, $srvauth, $srvpath, $srvquery, $srvfrag) =
88             uri_split($server_url);
89 15   100     147 $srvauth //= "";
90 15   50     22 $srvpath //= "";
91 15 100       52 return [400, "Please supply only riap+tcp/riap+unix/riap+pipe URL"]
92             unless $srvsch =~ /\Ariap\+(tcp|unix|pipe)\z/;
93 14 100       52 if ($srvsch eq 'riap+tcp') {
    100          
    50          
94 5 100       14 if ($srvauth =~ m!^(.+):(\d+)$!) {
95 3         9 ($host, $port) = ($1, $2, $3);
96 3         4 $uri = $srvpath;
97 3         11 $cache_key = "tcp:".lc($host).":$port";
98             } else {
99 2         9 return [400, "Invalid riap+tcp URL, please use this format: ".
100             "riap+tcp://host:1234 or riap+tcp://host:1234/uri"];
101             }
102             } elsif ($srvsch eq 'riap+unix') {
103 4 100       17 if ($srvpath =~ m!(.+)/(/.*)!) {
    100          
104 2         9 ($path, $uri) = (uri_unescape($1), $2);
105             } elsif ($srvpath =~ m!(.+)!) {
106 1         4 $path = uri_unescape($1);
107             }
108 4 100       42 unless ($which eq 'parse0') {
109 3 100       6 if (defined($path)) {
110 2 50       81 my $apath = abs_path($path) or
111             return [500, "Can't find absolute path for $path"];
112 2         6 $cache_key = "unix:$apath";
113             } else {
114 1         3 return [400, "Invalid riap+unix URL, please use this format: ".
115             ", e.g.: riap+unix:/path/to/unix/socket or ".
116             "riap+unix:/path/to/unix/socket//uri"];
117             }
118             }
119             } elsif ($srvsch eq 'riap+pipe') {
120 5 100       38 if ($srvpath =~ m!(.+?)//(.*?)/(/.*)!) {
    100          
    100          
121 2         6 ($path, $args, $uri) = (uri_unescape($1), $2, $3);
122             } elsif ($srvpath =~ m!(.+?)//(.*)!) {
123 1         5 ($path, $args) = (uri_unescape($1), $2);
124             } elsif ($srvpath =~ m!(.+)!) {
125 1         4 $path = uri_unescape($1);
126 1         13 $args = '';
127             }
128 5   100     54 $args = [map {uri_unescape($_)} split m!/!, $args // ''];
  6         18  
129 5 100       31 unless ($which eq 'parse0') {
130 4 100       7 if (defined($path)) {
131 3 50       77 my $apath = abs_path($path) or
132             return [500, "Can't find absolute path for $path"];
133 3         9 $cache_key = "pipe:$apath ".join(" ", @$args);
134             } else {
135 1         4 return [400, "Invalid riap+pipe URL, please use this format: ".
136             "riap+pipe:/path/to/prog or ".
137             "riap+pipe:/path/to/prog//arg1/arg2 or ".
138             "riap+pipe:/path/to/prog//arg1/arg2//uri"];
139             }
140             }
141             }
142              
143 10         8 my $req;
144             my $res;
145              
146 10 100       19 unless ($which eq 'parse0') {
147 7   100     11 $req = { v=>$self->{riap_version}, action=>$action, %{$extra // {}} };
  7         37  
148 7   66     23 $uri ||= $req->{uri}; $req->{uri} //= $uri;
  7   66     17  
149 7         23 $res = $self->check_request($req);
150 7 50       190 return $res if $res;
151             }
152              
153 10 50       22 if ($which =~ /parse/) {
154 10         57 return [200, "OK", {
155             args=>$args, host=>$host, path=>$path, port=>$port,
156             scheme=>$srvsch, uri=>$uri,
157             }];
158             }
159              
160 0         0 $log->tracef("Parsed URI, scheme=%s, host=%s, port=%s, path=%s, args=%s, ".
161             "uri=%s", $srvsch, $host, $port, $path, $args, $uri);
162              
163 0         0 require JSON::MaybeXS;
164 0         0 state $json = JSON::MaybeXS->new->allow_nonref;
165              
166 0         0 my $attempts = 0;
167 0         0 my $do_retry;
168             my $e;
169 0         0 while (1) {
170 0         0 $do_retry = 0;
171              
172 0         0 my ($in, $out);
173 0         0 my $cache = $self->{_conns}{$cache_key};
174             # check cache staleness
175 0 0       0 if ($cache) {
176 0 0       0 if ($srvsch =~ /tcp|unix/) {
177 0 0       0 if ($cache->{socket}->connected) {
178 0         0 $in = $out = $cache->{socket};
179             } else {
180 0         0 $log->infof("Stale socket cache (%s), discarded",
181             $cache_key);
182 0         0 $cache = undef;
183             }
184             } else {
185 0 0       0 if (kill(0, $cache->{pid})) {
186 0         0 $in = $cache->{chld_out};
187 0         0 $out = $cache->{chld_in};
188             } else {
189 0         0 $log->infof(
190             "Process (%s) seems dead/unsignalable, discarded",
191             $cache_key);
192 0         0 $cache = undef;
193             }
194             }
195             }
196             # connect
197 0 0       0 if (!$cache) {
198 0 0       0 if ($srvsch =~ /tcp|unix/) {
199 0         0 my $sock;
200 0 0       0 if ($srvsch eq 'riap+tcp') {
201 0         0 require IO::Socket::INET;
202 0         0 $sock = IO::Socket::INET->new(
203             PeerHost => $host,
204             PeerPort => $port,
205             Proto => 'tcp',
206             );
207             } else {
208 1     1   1925 use IO::Socket::UNIX;
  1         9005  
  1         4  
209 0         0 $sock = IO::Socket::UNIX->new(
210             Type => SOCK_STREAM,
211             Peer => $path,
212             );
213             }
214 0         0 $e = $@;
215 0 0       0 if ($sock) {
216 0         0 $self->{_conns}{$cache_key} = {socket=>$sock};
217 0         0 $in = $out = $sock;
218             } else {
219 0 0       0 $e = $srvsch eq 'riap+tcp' ?
220             "Can't connect to TCP socket $host:$port: $e" :
221             "Can't connect to Unix socket $path: $e";
222 0         0 $do_retry++; goto RETRY;
  0         0  
223             }
224             } else {
225             # taken from Modern::Perl. enable methods on filehandles;
226             # unnecessary when 5.14 autoloads them
227 0         0 require IO::File;
228 0         0 require IO::Handle;
229              
230 0         0 require IPC::Open2;
231              
232 0         0 require String::ShellQuote;
233             my $cmd = $path . (@$args ? " " . join(" ", map {
234 0 0       0 String::ShellQuote::shell_quote($_) } @$args) : "");
  0         0  
235 0         0 $log->tracef("executing cmd: %s", $cmd);
236              
237             # using shell
238             #my $pid = IPC::Open2::open2($in, $out, $cmd, @$args);
239              
240             # not using shell
241 0         0 my $pid = IPC::Open2::open2($in, $out, $path, @$args);
242              
243 0 0       0 if ($pid) {
244 0         0 $self->{_conns}{$cache_key} = {
245             pid=>$pid, chld_out=>$in, chld_in=>$out};
246             } else {
247 0         0 $e = "Can't open2 $cmd: $!";
248 0         0 $do_retry++; goto RETRY;
  0         0  
249             }
250             }
251             }
252              
253 0         0 my $req_json;
254 0         0 eval { $req_json = $json->encode($req) };
  0         0  
255 0         0 $e = $@;
256 0 0       0 return [400, "Can't encode request as JSON: $e"] if $e;
257              
258 0         0 $out->write("j$req_json\015\012");
259 0         0 $log->tracef("Sent request to server: %s", $req_json);
260              
261             # XXX alarm/timeout
262 0         0 my $line = $in->getline;
263 0         0 $log->tracef("Got line from server: %s", $line);
264 0 0       0 if (!$line) {
    0          
265 0         0 $self->_delete_cache($cache_key);
266 0         0 return [500, "Empty response from server"];
267             } elsif ($line !~ /^j(.+)/) {
268 0         0 $self->_delete_cache($cache_key);
269 0         0 return [500, "Invalid response line from server: $line"];
270             }
271 0         0 eval { $res = $json->decode($1) };
  0         0  
272 0         0 $e = $@;
273 0 0       0 if ($e) {
274 0         0 $self->_delete_cache($cache_key);
275 0         0 return [500, "Invalid JSON response from server: $e"];
276             }
277 0         0 strip_riap_stuffs_from_res($res);
278 0         0 return $res;
279              
280             RETRY:
281 0 0 0     0 if ($do_retry && $attempts++ < $self->{retries}) {
282 0         0 $log->tracef("Request failed ($e), waiting to retry #%s...",
283             $attempts);
284 0         0 sleep $self->{retry_delay};
285             } else {
286 0         0 last;
287             }
288             }
289 0         0 return [500, "$e (tried $attempts times)"];
290             }
291              
292             sub request_tcp {
293 0     0 1 0 my ($self, $action, $hostport, $extra) = @_;
294 0         0 $self->request($action, "riap+tcp://$hostport->[0]:$hostport->[1]", $extra);
295             }
296              
297             sub request_unix {
298 0     0 1 0 my ($self, $action, $sockpath, $extra) = @_;
299 0         0 $self->request($action => "riap+unix:" . uri_escape($sockpath), $extra);
300             }
301              
302             sub request_pipe {
303 0     0 1 0 my ($self, $action, $cmd, $extra) = @_;
304             $self->request($action => "riap+pipe:" . uri_escape($cmd->[0]) . "//" .
305 0         0 join("/", map {uri_escape($_)} @$cmd[1..@$cmd-1]),
  0         0  
306             $extra);
307             }
308              
309             sub parse_url {
310 3     3 1 10 my ($self, $uri) = @_;
311              
312 3         7 my $res0 = $self->_parse_or_request('parse0', 'dummy', $uri);
313             #use Data::Dump; dd $res0;
314 3 50       8 die "Can't parse URL $uri: $res0->[0] - $res0->[1]" unless $res0->[0]==200;
315 3         4 $res0 = $res0->[2];
316 3         8 my $res = {proto=>$res0->{scheme}, path=>$res0->{uri}};
317 3 100       11 if ($res->{proto} eq 'riap+unix') {
    100          
    50          
318 1         2 $res->{unix_sock_path} = $res0->{path};
319             } elsif ($res->{proto} eq 'riap+tcp') {
320 1         2 $res->{host} = $res0->{host};
321 1         2 $res->{port} = $res0->{port};
322             } elsif ($res->{proto} eq 'riap+pipe') {
323 1         2 $res->{prog_path} = $res0->{path};
324 1         2 $res->{args} = $res0->{args};
325             }
326              
327 3         18 $res;
328             }
329              
330             1;
331             # ABSTRACT: Riap::Simple client
332              
333             __END__