File Coverage

blib/lib/Perinci/Access/Simple/Client.pm
Criterion Covered Total %
statement 102 192 53.1
branch 40 86 46.5
condition 14 23 60.8
subroutine 17 23 73.9
pod 6 6 100.0
total 179 330 54.2


line stmt bran cond sub pod time code
1             package Perinci::Access::Simple::Client;
2              
3             our $DATE = '2017-07-03'; # DATE
4             our $VERSION = '0.23'; # VERSION
5              
6 1     1   21352 use 5.010001;
  1         4  
7 1     1   4 use strict;
  1         3  
  1         16  
8 1     1   4 use warnings;
  1         1  
  1         23  
9 1     1   3307 use Log::ger;
  1         75  
  1         4  
10              
11 1     1   970 use Cwd qw(abs_path);
  1         2  
  1         43  
12 1     1   361 use Perinci::AccessUtil qw(strip_riap_stuffs_from_res);
  1         1450  
  1         72  
13 1     1   578 use POSIX qw(:sys_wait_h);
  1         7113  
  1         9  
14 1     1   2304 use Tie::Cache;
  1         2700  
  1         38  
15 1     1   519 use URI::Split qw(uri_split);
  1         2796  
  1         58  
16 1     1   14 use URI::Escape;
  1         2  
  1         42  
17              
18 1     1   5 use parent qw(Perinci::Access::Base);
  1         2  
  1         7  
19              
20             sub new {
21 2     2 1 2798 my $class = shift;
22              
23 2         16 my $self = $class->SUPER::new(@_);
24              
25             # attributes
26 2   50     35 $self->{retries} //= 2;
27 2   50     11 $self->{retry_delay} //= 3;
28 2   50     11 $self->{conn_cache_size} //= 32;
29              
30             # connection cache, key="tcp:HOST:PORT" OR "unix:ABSPATH" or "pipe:ABSPATH
31             # ARGS". value=hash, for tcp & unix {socket=>...} and for pipe {pid=>...,
32             # chld_out=>..., chld_in=>...}
33 2         17 tie my(%conns), 'Tie::Cache', $self->{conn_cache_size};
34 2         66 $self->{_conns} = \%conns;
35              
36 2         6 $self;
37             }
38              
39             # for older Perinci::Access::Base 0.28-, to remove later
40       0     sub _init {}
41              
42             sub _delete_cache {
43 0     0   0 my ($self, $wanted) = @_;
44 0         0 my $conns = $self->{_conns};
45 0 0       0 return unless $conns;
46              
47 0 0       0 for my $k ($wanted ? ($wanted) : (keys %$conns)) {
48 0 0       0 if ($k =~ /^pipe:/) {
49 0         0 waitpid($conns->{$k}{pid}, WNOHANG);
50             }
51 0         0 delete $self->{_conns}{$k};
52             }
53             }
54              
55             sub DESTROY {
56 1     1   722 my ($self) = @_;
57              
58             #$self->_delete_cache;
59             }
60              
61             sub request {
62 0     0 1 0 my $self = shift;
63 0         0 $self->_parse_or_request('request', @_);
64             }
65              
66             sub _parse {
67 12     12   26999 my $self = shift;
68 12         42 $self->_parse_or_request('parse2', @_);
69             }
70              
71             # which: parse0 = quick parse (for parse_url(), parse2 = more thorough parse
72             # (for testing)
73             sub _parse_or_request {
74 15     15   68 my ($self, $which, $action, $server_url, $extra) = @_;
75 15         69 log_trace("=> %s\::request(action=%s, server_url=%s, extra=%s)",
76             __PACKAGE__, $action, $server_url, $extra);
77 15 50       73 return [400, "Please specify server_url"] unless $server_url;
78              
79 15         30 my ($uri,
80             $cache_key,
81             $host, $port, # tcp
82             $path, # unix & pipe
83             $args # pipe
84             );
85 15         51 my ($srvsch, $srvauth, $srvpath, $srvquery, $srvfrag) =
86             uri_split($server_url);
87 15   100     206 $srvauth //= "";
88 15   50     39 $srvpath //= "";
89 15 100       75 return [400, "Please supply only riap+tcp/riap+unix/riap+pipe URL"]
90             unless $srvsch =~ /\Ariap\+(tcp|unix|pipe)\z/;
91 14 100       57 if ($srvsch eq 'riap+tcp') {
    100          
    50          
92 5 100       25 if ($srvauth =~ m!^(.+):(\d+)$!) {
93 3         27 ($host, $port) = ($1, $2, $3);
94 3         8 $uri = $srvpath;
95 3         18 $cache_key = "tcp:".lc($host).":$port";
96             } else {
97 2         11 return [400, "Invalid riap+tcp URL, please use this format: ".
98             "riap+tcp://host:1234 or riap+tcp://host:1234/uri"];
99             }
100             } elsif ($srvsch eq 'riap+unix') {
101 4 100       25 if ($srvpath =~ m!(.+)/(/.*)!) {
    100          
102 2         13 ($path, $uri) = (uri_unescape($1), $2);
103             } elsif ($srvpath =~ m!(.+)!) {
104 1         7 $path = uri_unescape($1);
105             }
106 4 100       54 unless ($which eq 'parse0') {
107 3 100       12 if (defined($path)) {
108 2 50       75 my $apath = abs_path($path) or
109             return [500, "Can't find absolute path for $path"];
110 2         9 $cache_key = "unix:$apath";
111             } else {
112 1         8 return [400, "Invalid riap+unix URL, please use this format: ".
113             ", e.g.: riap+unix:/path/to/unix/socket or ".
114             "riap+unix:/path/to/unix/socket//uri"];
115             }
116             }
117             } elsif ($srvsch eq 'riap+pipe') {
118 5 100       37 if ($srvpath =~ m!(.+?)//(.*?)/(/.*)!) {
    100          
    100          
119 2         8 ($path, $args, $uri) = (uri_unescape($1), $2, $3);
120             } elsif ($srvpath =~ m!(.+?)//(.*)!) {
121 1         5 ($path, $args) = (uri_unescape($1), $2);
122             } elsif ($srvpath =~ m!(.+)!) {
123 1         4 $path = uri_unescape($1);
124 1         17 $args = '';
125             }
126 5   100     64 $args = [map {uri_unescape($_)} split m!/!, $args // ''];
  6         31  
127 5 100       44 unless ($which eq 'parse0') {
128 4 100       11 if (defined($path)) {
129 3 50       79 my $apath = abs_path($path) or
130             return [500, "Can't find absolute path for $path"];
131 3         13 $cache_key = "pipe:$apath ".join(" ", @$args);
132             } else {
133 1         5 return [400, "Invalid riap+pipe URL, please use this format: ".
134             "riap+pipe:/path/to/prog or ".
135             "riap+pipe:/path/to/prog//arg1/arg2 or ".
136             "riap+pipe:/path/to/prog//arg1/arg2//uri"];
137             }
138             }
139             }
140              
141 10         25 my $req;
142             my $res;
143              
144 10 100       28 unless ($which eq 'parse0') {
145 7   100     19 $req = { v=>$self->{riap_version}, action=>$action, %{$extra // {}} };
  7         48  
146 7   66     40 $uri ||= $req->{uri}; $req->{uri} //= $uri;
  7   66     31  
147 7         35 $res = $self->check_request($req);
148 7 50       299 return $res if $res;
149             }
150              
151 10 50       38 if ($which =~ /parse/) {
152 10         83 return [200, "OK", {
153             args=>$args, host=>$host, path=>$path, port=>$port,
154             scheme=>$srvsch, uri=>$uri,
155             }];
156             }
157              
158 0         0 log_trace("Parsed URI, scheme=%s, host=%s, port=%s, path=%s, args=%s, ".
159             "uri=%s", $srvsch, $host, $port, $path, $args, $uri);
160              
161 0         0 require JSON::MaybeXS;
162 0         0 state $json = JSON::MaybeXS->new->allow_nonref;
163              
164 0         0 my $attempts = 0;
165 0         0 my $do_retry;
166             my $e;
167 0         0 while (1) {
168 0         0 $do_retry = 0;
169              
170 0         0 my ($in, $out);
171 0         0 my $cache = $self->{_conns}{$cache_key};
172             # check cache staleness
173 0 0       0 if ($cache) {
174 0 0       0 if ($srvsch =~ /tcp|unix/) {
175 0 0       0 if ($cache->{socket}->connected) {
176 0         0 $in = $out = $cache->{socket};
177             } else {
178 0         0 log_info("Stale socket cache (%s), discarded",
179             $cache_key);
180 0         0 $cache = undef;
181             }
182             } else {
183 0 0       0 if (kill(0, $cache->{pid})) {
184 0         0 $in = $cache->{chld_out};
185 0         0 $out = $cache->{chld_in};
186             } else {
187 0         0 log_info(
188             "Process (%s) seems dead/unsignalable, discarded",
189             $cache_key);
190 0         0 $cache = undef;
191             }
192             }
193             }
194             # connect
195 0 0       0 if (!$cache) {
196 0 0       0 if ($srvsch =~ /tcp|unix/) {
197 0         0 my $sock;
198 0 0       0 if ($srvsch eq 'riap+tcp') {
199 0         0 require IO::Socket::INET;
200 0         0 $sock = IO::Socket::INET->new(
201             PeerHost => $host,
202             PeerPort => $port,
203             Proto => 'tcp',
204             );
205             } else {
206 1     1   1972 use IO::Socket::UNIX;
  1         9128  
  1         5  
207 0         0 $sock = IO::Socket::UNIX->new(
208             Type => SOCK_STREAM,
209             Peer => $path,
210             );
211             }
212 0         0 $e = $@;
213 0 0       0 if ($sock) {
214 0         0 $self->{_conns}{$cache_key} = {socket=>$sock};
215 0         0 $in = $out = $sock;
216             } else {
217 0 0       0 $e = $srvsch eq 'riap+tcp' ?
218             "Can't connect to TCP socket $host:$port: $e" :
219             "Can't connect to Unix socket $path: $e";
220 0         0 $do_retry++; goto RETRY;
  0         0  
221             }
222             } else {
223             # taken from Modern::Perl. enable methods on filehandles;
224             # unnecessary when 5.14 autoloads them
225 0         0 require IO::File;
226 0         0 require IO::Handle;
227              
228 0         0 require IPC::Open2;
229              
230 0         0 require String::ShellQuote;
231             my $cmd = $path . (@$args ? " " . join(" ", map {
232 0 0       0 String::ShellQuote::shell_quote($_) } @$args) : "");
  0         0  
233 0         0 log_trace("executing cmd: %s", $cmd);
234              
235             # using shell
236             #my $pid = IPC::Open2::open2($in, $out, $cmd, @$args);
237              
238             # not using shell
239 0         0 my $pid = IPC::Open2::open2($in, $out, $path, @$args);
240              
241 0 0       0 if ($pid) {
242 0         0 $self->{_conns}{$cache_key} = {
243             pid=>$pid, chld_out=>$in, chld_in=>$out};
244             } else {
245 0         0 $e = "Can't open2 $cmd: $!";
246 0         0 $do_retry++; goto RETRY;
  0         0  
247             }
248             }
249             }
250              
251 0         0 my $req_json;
252 0         0 eval { $req_json = $json->encode($req) };
  0         0  
253 0         0 $e = $@;
254 0 0       0 return [400, "Can't encode request as JSON: $e"] if $e;
255              
256 0         0 $out->write("j$req_json\015\012");
257 0         0 log_trace("Sent request to server: %s", $req_json);
258              
259             # XXX alarm/timeout
260 0         0 my $line = $in->getline;
261 0         0 log_trace("Got line from server: %s", $line);
262 0 0       0 if (!$line) {
    0          
263 0         0 $self->_delete_cache($cache_key);
264 0         0 return [500, "Empty response from server"];
265             } elsif ($line !~ /^j(.+)/) {
266 0         0 $self->_delete_cache($cache_key);
267 0         0 return [500, "Invalid response line from server: $line"];
268             }
269 0         0 eval { $res = $json->decode($1) };
  0         0  
270 0         0 $e = $@;
271 0 0       0 if ($e) {
272 0         0 $self->_delete_cache($cache_key);
273 0         0 return [500, "Invalid JSON response from server: $e"];
274             }
275 0         0 strip_riap_stuffs_from_res($res);
276 0         0 return $res;
277              
278             RETRY:
279 0 0 0     0 if ($do_retry && $attempts++ < $self->{retries}) {
280 0         0 log_trace("Request failed ($e), waiting to retry #%s...",
281             $attempts);
282 0         0 sleep $self->{retry_delay};
283             } else {
284 0         0 last;
285             }
286             }
287 0         0 return [500, "$e (tried $attempts times)"];
288             }
289              
290             sub request_tcp {
291 0     0 1 0 my ($self, $action, $hostport, $extra) = @_;
292 0         0 $self->request($action, "riap+tcp://$hostport->[0]:$hostport->[1]", $extra);
293             }
294              
295             sub request_unix {
296 0     0 1 0 my ($self, $action, $sockpath, $extra) = @_;
297 0         0 $self->request($action => "riap+unix:" . uri_escape($sockpath), $extra);
298             }
299              
300             sub request_pipe {
301 0     0 1 0 my ($self, $action, $cmd, $extra) = @_;
302             $self->request($action => "riap+pipe:" . uri_escape($cmd->[0]) . "//" .
303 0         0 join("/", map {uri_escape($_)} @$cmd[1..@$cmd-1]),
  0         0  
304             $extra);
305             }
306              
307             sub parse_url {
308 3     3 1 13 my ($self, $uri) = @_;
309              
310 3         8 my $res0 = $self->_parse_or_request('parse0', 'dummy', $uri);
311             #use Data::Dump; dd $res0;
312 3 50       10 die "Can't parse URL $uri: $res0->[0] - $res0->[1]" unless $res0->[0]==200;
313 3         6 $res0 = $res0->[2];
314 3         10 my $res = {proto=>$res0->{scheme}, path=>$res0->{uri}};
315 3 100       13 if ($res->{proto} eq 'riap+unix') {
    100          
    50          
316 1         3 $res->{unix_sock_path} = $res0->{path};
317             } elsif ($res->{proto} eq 'riap+tcp') {
318 1         2 $res->{host} = $res0->{host};
319 1         2 $res->{port} = $res0->{port};
320             } elsif ($res->{proto} eq 'riap+pipe') {
321 1         3 $res->{prog_path} = $res0->{path};
322 1         2 $res->{args} = $res0->{args};
323             }
324              
325 3         20 $res;
326             }
327              
328             1;
329             # ABSTRACT: Riap::Simple client
330              
331             __END__