File Coverage

lib/RPC/Switch/Client/Tiny.pm
Criterion Covered Total %
statement 363 453 80.1
branch 188 292 64.3
condition 32 69 46.3
subroutine 38 44 86.3
pod 5 22 22.7
total 626 880 71.1


line stmt bran cond sub pod time code
1             # Lightweight client for the RPC-Switch json-rpc request multiplexer
2             #
3             # see: RPC::Switch: https://github.com/a6502/rpc-switch
4             # RPC::Switch::Client: https://metacpan.org/pod/RPC::Switch::Client
5             #
6             package RPC::Switch::Client::Tiny;
7              
8 20     20   2195276 use strict;
  20         212  
  20         562  
9 20     20   119 use warnings;
  20         40  
  20         502  
10 20     20   100 use Carp 'croak';
  20         41  
  20         1014  
11 20     20   808 use JSON;
  20         8897  
  20         100  
12 20     20   11265 use IO::Select;
  20         37230  
  20         926  
13 20     20   15486 use IO::Socket::SSL;
  20         1261182  
  20         198  
14 20     20   14665 use Time::HiRes qw(time);
  20         26223  
  20         85  
15 20     20   12387 use RPC::Switch::Client::Tiny::Error;
  20         59  
  20         669  
16 20     20   7928 use RPC::Switch::Client::Tiny::Netstring;
  20         59  
  20         1436  
17 20     20   8481 use RPC::Switch::Client::Tiny::Async;
  20         79  
  20         629  
18 20     20   8517 use RPC::Switch::Client::Tiny::SessionCache;
  20         59  
  20         114513  
19              
20             our $VERSION = '1.66_01';
21              
22             sub new {
23 234     234 1 421105 my ($class, %args) = @_;
24 234 50       1771 my $s = $args{sock} or croak __PACKAGE__ . " expects sock";
25 234 50       1009 unless ($^O eq 'MSWin32') { # cpantester: strawberry perl does not support blocking() call
26 234 50       1351 defined(my $b = $s->blocking()) or croak __PACKAGE__ . " bad socket: $!";
27 234 100       3003 unless ($b) { croak __PACKAGE__ . " nonblocking socket not supported"; }
  19         5054  
28             }
29 215 50       1255 unless (exists $args{who}) { croak __PACKAGE__ . " expects who"; }
  0         0  
30 215         3685 my $self = bless {
31             %args,
32             id => 1, # next request id
33             state => '', # last rpcswitch.type
34             reqs => {}, # outstanding requests
35             channels => {}, # open rpcswitch channels
36             methods => {}, # defined worker methods
37             announced => {}, # announced worker methods
38             msglimit => 999999, # max netstring size
39             }, $class;
40 215 50       1689 if (ref($self->{sock}) eq 'IO::Socket::SSL') {
41 0 0       0 $self->{auth_method} = 'clientcert' unless exists $self->{auth_method};
42 0 0       0 $self->{token} = $self->{who} unless exists $self->{token}; # should be optional for clientcert
43             } else {
44 215 50       1529 $self->{auth_method} = 'password' unless exists $self->{auth_method};
45             }
46 215 100       1090 $self->{json_utf8} = $self->{client_encoding_utf8} ? {} : {utf8 => 1};
47 215         1947 return $self;
48             }
49              
50             sub rpc_error {
51 76     76 0 1520 return RPC::Switch::Client::Tiny::Error->new(@_);
52             }
53              
54             sub rpc_send {
55 863     863 0 3389 my ($self, $msg) = @_;
56 863         1808 my $s = $self->{sock};
57 863         2582 my $len = length($s);
58 863 50 33     8069 if ($self->{msglimit} && ($len > $self->{msglimit})) {
59 0         0 warn "rpc_send msglimit exceeded: $len > $self->{msglimit}";
60 0         0 return;
61             }
62 863         4286 $msg->{jsonrpc} = '2.0';
63 863         1642 my $str = to_json($msg, {canonical => 1, %{$self->{json_utf8}}});
  863         4841  
64 863 100       27876 $self->{trace_cb}->('SND', $msg) if $self->{trace_cb};
65 863         11593 return netstring_write($s, $str);
66             }
67              
68             sub rpc_send_req {
69 472     472 0 1406 my ($self, $method, $msg) = @_;
70 472         1860 my $id = "$self->{id}"; $self->{id}++;
  472         1197  
71 472         1375 $msg->{id} = $id;
72 472         1202 $msg->{method} = $method;
73 472         2343 $self->{reqs}{$id} = $method;
74 472 100       3266 $self->{state} = $method if $method =~ /^rpcswitch\./;
75 472 100       1557 $self->rpc_send($msg) or return;
76 467         1576 return $id;
77             }
78              
79             sub rpc_send_call {
80 176     176 0 2629 my ($self, $method, $params, $reqauth) = @_;
81              
82 176 50       552 if (defined $reqauth) { # request authentication
83             # Without the vcookie the rpcswitch does not validate
84             # the reqauth parameter.
85             # The vcookie 'eatme' value is hardcoded in the rpc-switch
86             # code and is called 'channel information version'.
87             #
88 0         0 return $self->rpc_send_req($method, {params => $params, rpcswitch => {vcookie => 'eatme', reqauth => $reqauth}});
89             } else {
90 176         762 return $self->rpc_send_req($method, {params => $params});
91             }
92             }
93              
94             sub rpc_decode {
95 849     849 0 2545 my ($self, $msg) = @_;
96 849         4631 my ($req, $rsp) = ('', '');
97              
98 849 50 66     5962 unless (($msg->{jsonrpc} eq '2.0') && (exists $msg->{id} || exists $msg->{method})) {
      66        
99 0         0 die rpc_error('jsonrpc', "bad json-rpc: ".to_json($msg, {canonical => 1}));
100             }
101 849 100       3802 if (exists $msg->{method}) {
    50          
    50          
102 420         1336 $req = $msg->{method};
103             } elsif (!defined $msg->{id}) {
104 0         0 die rpc_error('jsonrpc', "bad response id: ".to_json($msg, {canonical => 1}));
105             } elsif (!exists $self->{reqs}{$msg->{id}}) {
106 0         0 die rpc_error('jsonrpc', "unknown response $msg->{id}: ".to_json($msg, {canonical => 1}));
107             } else {
108 429         1250 $rsp = delete $self->{reqs}{$msg->{id}};
109              
110 429 50       1119 if (exists $msg->{error}) {
111 0         0 die rpc_error('jsonrpc', "$rsp $msg->{id} response error: $msg->{error}{message}", {code => $msg->{error}{code}});
112             }
113 429 50       1212 if (!exists $msg->{result}) {
114 0         0 die rpc_error('rpcswitch', "$rsp $msg->{id} response error: result missing");
115             }
116 429 50 100     1751 if ((ref($msg->{result}) ne 'ARRAY') && ($rsp ne 'rpcswitch.ping') && ($rsp ne 'rpcswitch.withdraw')) {
      66        
117 0         0 die rpc_error('rpcswitch', "$rsp $msg->{id} bad response: $msg->{result}");
118             }
119             }
120 849         3534 return ($req, $rsp);
121             }
122              
123             sub rpc_worker_announce {
124 159     159 0 735 my ($self, $workername) = @_;
125              
126             # ignore repeated announce request or unfinished withdraw
127             #
128 159 100       395 return if (keys %{$self->{announced}});
  159         647  
129              
130 130         311 foreach my $method (keys %{$self->{methods}}) {
  130         714  
131 145 100       472 next if exists $self->{methods}{$method}{id}; # active announce/withdraw request
132              
133 135         628 my $params = {method => $method, workername => $workername, doc => $self->{methods}{$method}{doc}};
134 135 50       437 $params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter};
135 135         530 my $id = $self->rpc_send_req('rpcswitch.announce', {params => $params});
136 135 50       639 die rpc_error('io', 'netstring_write') unless defined $id;
137 135         620 $self->{methods}{$method}{id} = $id;
138             }
139 130         329 return;
140             }
141              
142             sub rpc_worker_withdraw {
143 31     31 0 237 my ($self) = @_;
144              
145             # callers will get code -32006 'opposite end of channel gone'
146             # errors when the announcement is withdrawn.
147             #
148 31         126 foreach my $method (keys %{$self->{announced}}) {
  31         238  
149 16 100       144 next if exists $self->{methods}{$method}{id}; # active announce/withdraw request
150              
151 8         72 my $params = {method => $method};
152 8 50       144 $params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter};
153 8         144 my $id = $self->rpc_send_req('rpcswitch.withdraw', {params => $params});
154 8 50       144 die rpc_error('io', 'netstring_write') unless defined $id;
155 8         112 $self->{methods}{$method}{id} = $id;
156             }
157 31         109 return;
158             }
159              
160             sub rpc_worker_flowcontrol {
161 581     581 0 2479 my ($self, $workername) = @_;
162              
163             # need to be in connected auth state
164 581 100 100     5193 return unless ($self->{state} && ($self->{state} ne 'rpcswitch.hello'));
165              
166 449 100       1157 if ($self->{flowcontrol}) {
167 102         272 my $cnt = (scalar keys %{$self->{async}{jobs}}) + (scalar @{$self->{async}{jobqueue}});
  102         259  
  102         292  
168             #printf ">> flow: %d %d %d\n", $cnt, $self->{async}{max_async} * 2, $self->{async}{max_async};
169 102 100       804 if ($cnt >= $self->{async}{max_async} * 2) {
    100          
170 31         353 $self->rpc_worker_withdraw();
171             } elsif ($cnt < $self->{async}{max_async}) {
172 44         320 $self->rpc_worker_announce($workername);
173             }
174             }
175 449         865 return;
176             }
177              
178             sub valid_worker_err {
179 19     19 0 304 my ($err) = @_;
180 19 50       1083 $err = {text => $err} unless ref($err); # convert plain errors
181 19 50       133 $err->{class} = 'hard' unless exists $err->{class};
182 19         190 return $err;
183             }
184              
185             sub rpcswitch_resp {
186 244     244 0 515 my ($rpcswitch) = @_;
187              
188             # Just the vcookie & vci-channel parameters are required by
189             # the rpcswitch. The worker_id field is optional, and might
190             # be set to the worker_id returned by the announce response.
191             #
192 244         1031 $rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}};
193             #$rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}, worker_id => $rpcswitch->{worker_id}};
194 244         489 return $rpcswitch;
195             }
196              
197             sub client {
198 228     228 1 3002 my ($self, $msg, $method, $params, $reqauth) = @_;
199 228         1064 my ($req, $rsp) = $self->rpc_decode($msg);
200              
201 228 100       1615 if ($req eq 'rpcswitch.greetings') {
    100          
    100          
    100          
    50          
    50          
202 38 50       190 my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert
203 38         209 my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}};
204 38         456 $self->rpc_send_req('rpcswitch.hello', {params => $helloparams});
205             } elsif ($rsp eq 'rpcswitch.hello') {
206 19 50       1976 if (!$msg->{result}[0]) {
207 0         0 die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]");
208             }
209 19         874 $self->rpc_send_call($method, $params, $reqauth);
210             } elsif ($rsp eq 'rpcswitch.ping') {
211 19         266 return [$msg->{result}]; # ping complete
212             } elsif ($rsp eq $method) {
213 133 50       836 if (exists $msg->{rpcswitch}) { # internal rpcswitch methods have no channel
214 133         627 $self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone
215             }
216 133 100       1254 if ($msg->{result}[0] eq 'RES_WAIT') { # async worker notification (might use trace_cb to dump)
    100          
    50          
217 19         361 $self->{channels}{$msg->{rpcswitch}{vci}} = $msg->{result}[1]; # msg id
218             } elsif ($msg->{result}[0] eq 'RES_ERROR') { # worker error
219 19         266 my $e = valid_worker_err($msg->{result}[1]);
220 19         247 die rpc_error('worker', to_json($e), $e);
221             } elsif ($msg->{result}[0] eq 'RES_OK') {
222 95         342 return [@{$msg->{result}}[1..$#{$msg->{result}}]]; # client result[1..$]
  95         570  
  95         380  
223             }
224             } elsif ($req eq 'rpcswitch.result') {
225 0         0 my $channel = $msg->{rpcswitch}{vci};
226 0 0 0     0 if (($msg->{params}[0] eq 'RES_OK') && ($msg->{params}[1] eq $self->{channels}{$channel})) {
    0 0        
227 0         0 return [@{$msg->{params}}[2..$#{$msg->{params}}]]; # client result[2..$] (notification)
  0         0  
  0         0  
228             } elsif (($msg->{params}[0] eq 'RES_ERROR') && ($msg->{params}[1] eq $self->{channels}{$channel})) {
229 0         0 my $e = valid_worker_err($msg->{params}[2]);
230 0         0 die rpc_error('worker', to_json($e), $e);
231             }
232 0         0 die rpc_error('rpcswitch', "bad msg: $msg->{params}[0] $msg->{params}[1]");
233             } elsif ($req eq 'rpcswitch.channel_gone') {
234 19         76 my $channel = $msg->{params}{channel};
235 19 50       76 if (exists $self->{channels}{$channel}) {
236 19         76 my $id = delete $self->{channels}{$channel};
237 19         133 die rpc_error('rpcswitch', "$req for request $id: $channel");
238             }
239 0         0 die rpc_error('rpcswitch', "$req for unknown request: $channel");
240             } else {
241 0         0 die rpc_error('rpcswitch', "unsupported msg: ".to_json($msg, {canonical => 1}));
242             }
243 76         361 return;
244             }
245              
246             sub worker {
247 621     621 1 2187 my ($self, $msg, $workername) = @_;
248 621         3866 my ($req, $rsp) = $self->rpc_decode($msg);
249              
250 621 100       2876 if ($req eq 'rpcswitch.greetings') {
    100          
    100          
    50          
    100          
    50          
    0          
251 115 50       561 my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert
252 115         538 my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}};
253 115         514 $self->rpc_send_req('rpcswitch.hello', {params => $helloparams});
254             } elsif ($rsp eq 'rpcswitch.hello') {
255 115 50       1090 if (!$msg->{result}[0]) {
256 0         0 die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]");
257             }
258 115         2198 $self->rpc_worker_announce($workername);
259             } elsif ($rsp eq 'rpcswitch.announce') {
260 135 50       685 if (!$msg->{result}[0]) {
261 0         0 die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]");
262             }
263 135 100       1297 my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}};
  165         2198  
  135         466  
264 135 50       519 if (!defined $method) {
265 0         0 die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}[1]");
266             }
267             # register announced method
268 135         463 $self->{announced}{$method}{cb} = $self->{methods}{$method}{cb};
269 135         338 $self->{announced}{$method}{worker_id} = $msg->{result}[1]{worker_id};
270 135         400 delete $self->{methods}{$method}{id};
271             } elsif ($req eq 'rpcswitch.ping') {
272 0         0 $self->rpc_send({id => $msg->{id}, result => 'pong!'});
273             } elsif (exists $self->{announced}{$req}) {
274 248         651 $msg->{rpcswitch}{worker_id} = $self->{announced}{$req}{worker_id}; # save worker_id for response
275              
276 248         648 $self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone
277              
278 248 100       594 if ($self->{async}) { # use async call for forked childs
279 199         807 $self->{async}->msg_enqueue($msg);
280             } else {
281 49         170 my $rpcswitch = rpcswitch_resp($msg->{rpcswitch});
282 49         117 my @resp = eval { $self->{announced}{$req}{cb}->($msg->{params}, $msg->{rpcswitch}) };
  49         420  
283 49 100       1139 if ($@) {
284 15         255 $self->rpc_send({id => $msg->{id}, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch});
285             } else {
286 34         469 $self->rpc_send({id => $msg->{id}, result => ['RES_OK', @resp], rpcswitch => $rpcswitch});
287             }
288             }
289             } elsif ($rsp eq 'rpcswitch.withdraw') {
290             # Note: the rpcswitch sends just a boolean result here
291             #
292 8 50       184 if (!$msg->{result}) {
293 0         0 die rpc_error('rpcswitch', "$rsp failed: $msg->{result}");
294             }
295 8 50       104 my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}};
  8         176  
  8         128  
296 8 50       40 if (!defined $method) {
297 0         0 die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}");
298             }
299             # remove announced method
300 8         112 delete $self->{announced}{$method};
301 8         32 delete $self->{methods}{$method}{id};
302             } elsif ($req eq 'rpcswitch.channel_gone') {
303 0         0 my $channel = $msg->{params}{channel};
304 0 0       0 if ($self->{async}) {
305 0     0   0 my ($childs, $msgs) = $self->{async}->jobs_terminate('gone', sub { $_[0]->{rpcswitch}{vci} eq $channel });
  0         0  
306 0 0       0 if (@$msgs) {
307 0         0 warn "worker removed queued messages on channel gone: ".join(' ', map { $_->{id} } @$msgs);
  0         0  
308             }
309             }
310 0 0       0 if (exists $self->{channels}{$channel}) {
311 0         0 delete $self->{channels}{$channel};
312             } else {
313 0         0 warn "worker $req for unknown request: $channel";
314             }
315             } else {
316 0         0 warn "worker unsupported msg: ".to_json($msg, {canonical => 1});
317             }
318 621         1881 return;
319             }
320              
321             sub is_session_req {
322 34     34 0 171 my ($self, $params) = @_;
323 34 100       373 return unless $self->{sessioncache};
324              
325 20 50 33     232 if (exists $params->{session} && exists $params->{session}{id}) {
326 20         142 return $params->{session};
327             }
328 0         0 return;
329             }
330              
331             sub is_session_resp {
332 34     34 0 127 my ($self, $params) = @_;
333 34 100       498 return unless $self->{sessioncache};
334              
335 20 50 33     440 if ((ref($params) eq 'ARRAY') && ($params->[0] eq 'RES_OK') && ref($params->[2]) && exists $params->[2]->{set_session}) {
      33        
      33        
336 20         197 return $params->[2]->{set_session};
337             }
338 0         0 return;
339             }
340              
341             sub child_handler {
342 18     18 0 370 my ($self, $wr) = @_;
343              
344             # The child has to explicitly close the ssl-socket without shutdown.
345             # Otherwise the parent will get an EOF.
346             # see: https://metacpan.org/pod/IO::Socket::SSL#Common-Usage-Errors
347             #
348 18 50       482 if (ref($self->{sock}) eq 'IO::Socket::SSL') {
349 0         0 $self->{sock}->close(SSL_no_shutdown => 1);
350             } else {
351 18         582 close($self->{sock});
352             }
353 18         83 $self->{sock} = $wr;
354 18         501 delete $self->{trace_cb};
355 18         810 local $SIG{INT} = 'DEFAULT';
356 18         631 local $SIG{PIPE} = 'IGNORE'; # handle sigpipe via print/write result
357              
358             # When session handling is enabled a child might process
359             # more than one request with the same session_id.
360             #
361 18         273 while (1) {
362 24         946 my $b = eval { netstring_read($self->{sock}) };
  24         739  
363 24 100       214 unless ($b) {
364 4 50 33     28 next if ($@ && ($@ =~ /^EINTR/)); # interrupted
365 4 50       23 die "worker child: $@" if $@;
366 4         12 last; # EOF
367             }
368 20         171 my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) };
  20         121  
  20         799  
369 20 50       2907 die "worker child: $@" if $@;
370              
371             # The client catches all possible die() calls, so that it is
372             # guaranteed to call exit either from here or from a signal handler.
373             #
374 20         232 my $params;
375 20         229 my $callback = $self->{methods}{$msg->{method}}{cb};
376 20         173 my @resp;
377 20         129 eval {
378 20         686 local $SIG{PIPE} = 'DEFAULT'; # reenable sigpipe for worker code
379 20         590 @resp = $callback->($msg->{params}, $msg->{rpcswitch});
380             };
381 20 50       903509 if (my $err = $@) {
382 0         0 $params = ['RES_ERROR', $msg->{id}, $err];
383             } else {
384 20         150 $params = ['RES_OK', $msg->{id}, @resp];
385             }
386 20         82 $b = eval { to_json($params, {%{$self->{json_utf8}}}) };
  20         46  
  20         345  
387 20 50       1279 return 1 if $@; # signal die from json encode
388 20         404 my $res = netstring_write($self->{sock}, $b);
389 20 50       538 return 2 unless $res; # signal socket error
390              
391 20 100 66     204 last unless $self->is_session_resp($params) || $self->is_session_req($msg->{params});
392             }
393 18 100       556 close($self->{sock}) or return 3; # signal errors like broken pipe
394 17         487 return 0;
395             }
396              
397             sub _worker_child_write {
398 177     177   706 my ($self, $child, $msg) = @_;
399              
400 177         342 my $b = to_json($msg, {canonical => 1, %{$self->{json_utf8}}});
  177         6455  
401 177         19927 my $res = netstring_write($child->{reader}, $b); # forward request to worker child
402 177 50       928 die rpc_error('io', 'netstring_write') unless $res;
403 177         613 return;
404             }
405              
406             sub _worker_child_get {
407 195     195   566 my ($self, $msg) = @_;
408              
409             # First try to reuse child for existing session
410             #
411 195 100       517 if (my $sessioncache = $self->{sessioncache}) {
412 20 50       166 if (my $session_req = $self->is_session_req($msg->{params})) {
    0          
413 20 100       183 if (my $child = $sessioncache->session_get($session_req->{id}, $msg->{id}, $msg->{rpcswitch}{vci})) {
414 6         18 return $child;
415             }
416             } elsif ($sessioncache->{session_persist_user}) {
417 0 0       0 if (exists $msg->{params}{$sessioncache->{session_persist_user}}) {
418 0         0 my $user = $msg->{params}{$sessioncache->{session_persist_user}};
419 0 0       0 if (my $child = $sessioncache->session_get_per_user($user, $msg->{id}, $msg->{rpcswitch}{vci})) {
420 0         0 delete $child->{session}; # reused session will be added after session_resp
421 0         0 return $child;
422             }
423             }
424             }
425             }
426 189         1161 my $child = $self->{async}->child_start($self, $msg->{id}, $msg->{rpcswitch}{vci});
427 171         3263 return $child;
428             }
429              
430             sub _worker_childs_dequeue_and_run {
431 599     599   1128 my ($self) = @_;
432              
433 599         2641 while (my $msg = $self->{async}->msg_dequeue()) {
434 195         482 my $id = $msg->{id};
435 195         760 my $rpcswitch_resp = rpcswitch_resp($msg->{rpcswitch});
436              
437 195         435 my $child = eval { $self->_worker_child_get($msg) };
  195         776  
438 177 50       1010 if ($@) {
439 0         0 $self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp});
440             } else {
441 177         471 eval { $self->_worker_child_write($child, $msg) };
  177         1555  
442 177 50       419 if ($@) {
443 0         0 $self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp});
444 0         0 $self->{async}->child_finish($child, 'error');
445             } else {
446 177         7950 $self->rpc_send({id => $id, result => ['RES_WAIT', $id], rpcswitch => $rpcswitch_resp});
447 177         1703 $self->{async}->job_add($child, $msg->{id}, {rpcswitch => $rpcswitch_resp});
448             }
449             }
450             }
451 581         2218 return;
452             }
453              
454             sub _worker_child_read_and_finish {
455 165     165   443 my ($self, $child) = @_;
456              
457 165         231 my $res;
458 165         345 my $b = eval { netstring_read($child->{reader}) };
  165         942  
459 165 100       493 unless ($b) {
460 14 50       210 my $err = $@ ? $@ : 'EOF';
461 14         406 $res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}});
462 14         1092 $self->{async}->child_finish($child, 'error');
463             } else {
464 151         448 my $params = eval { from_json($b, {%{$self->{json_utf8}}}) };
  151         282  
  151         1523  
465 151 50       8468 if ($@) {
466 0         0 $res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $@], rpcswitch => $child->{rpcswitch}});
467 0         0 $self->{async}->child_finish($child, 'error');
468             } else {
469 151         1974 $res = $self->rpc_send({method => 'rpcswitch.result', params => $params, rpcswitch => $child->{rpcswitch}});
470 151 50       717 unless ($res) {
471 0         0 my $err = "result msg limit exceeded: " . length($b);
472 0         0 $res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}});
473 0         0 $self->{async}->child_finish($child, 'error');
474 0         0 return $res;
475             }
476              
477 151 100       527 if (my $sessioncache = $self->{sessioncache}) {
478 14 50       117 if (my $set_session = $self->is_session_resp($params)) {
479 14 100 66     163 if (!exists $child->{session} || ($child->{session}{id} ne $set_session->{id})) {
480 8         112 $child->{session} = $sessioncache->session_new($set_session);
481 8         68 $sessioncache->expire_insert($child->{session});
482             }
483             }
484              
485 14 100       145 if ($sessioncache->session_put($child)) {
    50          
486 9         15 my $cnt = scalar keys %{$sessioncache->{active}};
  9         132  
487 9 50       36 if ($cnt > $sessioncache->{max_session}) {
488 0 0       0 if ($child = $sessioncache->lru_dequeue()) {
489 0         0 $self->{async}->child_finish($child, 'lru');
490             }
491             }
492 9         18 $child = undef;
493             } elsif (my $idle_child = $sessioncache->session_get_per_user_idle($child)) {
494             # update idle user session with older session_id
495             #
496 0         0 $self->{async}->child_finish($idle_child, 'update');
497              
498 0 0       0 if ($sessioncache->session_put($child)) {
499 0         0 $child = undef;
500             }
501             }
502             }
503 151 100       801 if ($child) {
504 142         682 $self->{async}->child_finish($child, 'done');
505             }
506             }
507             }
508 165         884 return $res;
509             }
510              
511             sub _worker_sessions_expire {
512 599     599   1113 my ($self) = @_;
513 599 100       1817 return unless $self->{sessioncache};
514              
515             # If a job for the expired session is active, the session
516             # will be dropped when session_put() is called after the
517             # job completed.
518             #
519 57         266 while (my $child = $self->{sessioncache}->expired_dequeue()) {
520 0         0 $self->{async}->child_finish($child, 'expired');
521             }
522 57         97 return;
523             }
524              
525             sub rpc_timeout {
526 1088     1088 0 2441 my ($self, $call_timeout) = @_;
527              
528 1088 100 66     5267 if ($call_timeout && (keys %{$self->{reqs}} > 0)) {
  19         418  
529 19         95 return $call_timeout; # for individual client call
530             }
531 1069         2094 return $self->{timeout};
532             }
533              
534             sub rpc_stopped {
535 1106     1106 0 2629 my ($self) = @_;
536              
537 1106 50       4052 if ($self->{stop}) {
538 0 0 0     0 if (($self->{stop} eq 'withdraw') && (keys %{$self->{announced}})) {
  0 0 0     0  
      0        
539 0         0 return; # wait for withdraw to complete
540 0         0 } elsif (($self->{stop} eq 'withdraw') && $self->{async} && (keys %{$self->{async}{jobs}})) {
541 0         0 return; # wait for active jobs to complete
542             }
543 0         0 return 1;
544             }
545             }
546              
547             sub rpc_handler {
548 305     305 0 1450 my ($self, $call_timeout, $handler, @handler_params) = @_;
549              
550             # returns response or throws rpc_error.
551             # returns undef when remote side cleanly closed connection with EOF.
552             #
553 305         1101 while (!$self->rpc_stopped()) {
554 1106         2118 my @pipes = ();
555 1106 100       2951 if ($self->{async}) {
556 599         2181 $self->_worker_sessions_expire();
557 599 50       2381 $self->_worker_childs_dequeue_and_run() unless $self->{stop};
558 581         4131 $self->{async}->childs_reap(nonblock => 1);
559 581         3189 $self->rpc_worker_flowcontrol(@handler_params);
560 581         949 @pipes = map { $_->{reader} } values %{$self->{async}{jobs}};
  439         2680  
  581         3579  
561             }
562 1088         4924 my $timeout = $self->rpc_timeout($call_timeout);
563              
564 1088 100 100     5639 if ($timeout || @pipes) {
565 437         9621 my @ready = IO::Select->new(($self->{sock}, @pipes))->can_read($timeout);
566 437 50 66     13554854 next if (@ready == 0) && $!{EINTR}; # $! is not reset on success
567 437 100       2331 die rpc_error('jsonrpc', 'receive timeout') unless (@ready > 0);
568              
569 418         3123 foreach my $fh (@ready) {
570 460 50 66     3131 if (($fh != $self->{sock}) && $self->{async}) {
571 165 50       1468 unless (exists $self->{async}{jobs}{$fh->fileno}) {
572 0         0 die rpc_error('io', "child pipe not found: ". $fh->fileno);
573             }
574 165         2303 my $child = $self->{async}{jobs}{$fh->fileno};
575 165         1580 $self->{async}->job_rem($child);
576 165         773 my $res = $self->_worker_child_read_and_finish($child);
577             }
578             }
579 418 100       1023 next unless grep { $_ == $self->{sock} } @ready;
  460         3591  
580             }
581              
582             # always block on full messages from rpcswitch
583 946         1795 my $b = eval { netstring_read($self->{sock}) };
  946         3402  
584 946 100       3733 unless ($b) {
585 97 50 33     330 next if ($@ && ($@ =~ /^EINTR/)); # check if stopped
586 97 50       503 die rpc_error('io', $@) if $@;
587 97         535 return; # EOF
588             }
589 849         2375 my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) };
  849         1596  
  849         5613  
590 849 100       30309 die rpc_error('jsonrpc', $@) if $@;
591 830 100       2808 $self->{trace_cb}->('RCV', $msg) if $self->{trace_cb};
592 830         8355 my $res = eval { $handler->($self, $msg, @handler_params) };
  830         2988  
593 830 100       2818 if (my $err = $@) {
594 38 50       608 die $err if ref($err); # forward error
595 0         0 die rpc_error('io', $err);
596             }
597 792 100       4266 if ($res) {
598 114         722 return $res;
599             }
600             }
601 0         0 return; # STOP is checked by caller
602             }
603              
604             sub work {
605 134     134 1 3413 my ($self, $workername, $methods, $opts) = @_;
606              
607             # a write on a shutdown socket should never happen
608             #
609 134     0   3863 local $SIG{'PIPE'} = sub { die "work[$$]: got PIPE!\n" };
  0         0  
610              
611 134         863 foreach my $method (keys %$methods) {
612 130         582 $self->{methods}{$method}{cb} = $methods->{$method}{cb};
613 130 100       483 $self->{methods}{$method}{doc} = (defined $methods->{$method}{doc}) ? $methods->{$method}{doc} : {};
614 130 50       492 $self->{methods}{$method}{filter} = $methods->{$method}{filter} if exists $methods->{$method}{filter};
615             }
616 134 100       461 $opts->{trace_cb} = $self->{trace_cb} if exists $self->{trace_cb};
617 134 100       1927 $self->{async} = RPC::Switch::Client::Tiny::Async->new(%$opts) if $opts->{max_async};
618 134 50       1389 $self->{flowcontrol} = $opts->{flowcontrol} if $opts->{flowcontrol};
619 134 100       709 $self->{sessioncache} = RPC::Switch::Client::Tiny::SessionCache->new(%$opts) if $opts->{max_session};
620 134         794 $self->rpc_handler(0, \&worker, $workername);
621              
622 97 50       388 if ($self->{stop}) {
623 0         0 $self->rpc_worker_withdraw();
624 0         0 $self->{stop} = 'withdraw';
625              
626             # wait some time for withdraw & active jobs to complete
627             #
628 0     0   0 local $SIG{ALRM} = sub { warn "worker child stop timeout\n"; $self->{stop} = 'timeout'; };
  0         0  
  0         0  
629 0         0 alarm($self->{gracetime});
630 0         0 $self->rpc_handler(0, \&worker, $workername);
631 0         0 alarm(0);
632 0         0 die rpc_error('io', 'STOP');
633             }
634 97 100       501 if (my $async = $self->{async}) {
635             # drop stored sessions
636             #
637 48 100       441 if (my $sessioncache = $self->{sessioncache}) {
638 4         35 foreach my $session_id (keys %{$sessioncache->{active}}) {
  4         47  
639 3 50       18 if (my $child = $sessioncache->session_get($session_id)) {
640 3         15 $async->child_finish($child, 'idle');
641             }
642             }
643             }
644             # reap remaining childs
645             #
646 48 50       274 if (keys %{$async->{finished}}) {
  48         403  
647 48     0   2984 local $SIG{ALRM} = sub { warn "worker child wait timeout\n" };
  0         0  
648 48         555 alarm(1); # wait at most for one second
649 48 50       388 unless ($async->childs_reap()) { # blocking
650 0         0 $async->childs_reap(nonblock => 1); # continue nonblocking after timeout
651             }
652 48         1542 alarm(0);
653             }
654              
655             # EOF is only an error here when there are outstanding requests
656             #
657 48     0   1023 my ($stopped, $msgs) = $async->jobs_terminate('stopped', sub { 1 });
  0         0  
658 48         233 my @childs = keys %{$async->{finished}};
  48         200  
659              
660 48         310 $async->childs_kill(); # don't wait here
661              
662 48         195 $async->{jobqueue} = [];
663 48         270 $async->{finished} = {};
664              
665 48 50       210 die rpc_error('io', 'eof while jobs active: '.join(' ', @childs)) if (@childs);
666 48 50       212 die rpc_error('io', 'eof while jobs queued: '.join(' ', @$msgs)) if (@$msgs);
667             }
668 97         2577 return;
669             }
670              
671             sub call {
672 171     171 1 196802 my ($self, $method, $params, $opts) = @_;
673 171         589 my $reqauth = $opts->{reqauth};
674 171         361 my $call_timeout = $opts->{timeout};
675              
676 171 100       855 if ($self->{state} eq 'rpcswitch.hello') { # trigger rpc_send for consecutive requests
677 133         570 $self->rpc_send_call($method, $params, $reqauth);
678             }
679             # EOF is an error here (response missing)
680             #
681 171 50       1197 my $res = $self->rpc_handler($call_timeout, \&client, $method, $params, $reqauth) or die rpc_error('io', 'eof');
682 114 100       798 return wantarray() ? @$res : $res->[0];
683             }
684              
685             # stop() exits an active $client->work() worker handler.
686             #
687             # - work() dies with RPC::Switch::Client::Tiny::Error which
688             # might be {type => 'io', message => 'STOP'}, or any other
689             # error if a non-restartable system call was interrupted.
690             #
691             # - stop() makes no sense for call() (it has rpc_timeout)
692             # - the only way to call stop is from a signal handler.
693             # - if a signal handler is called, non-restartavle perl
694             # system call are interrupted and return $! == EINTR.
695             #
696             # -> this can break an active worker handler and result in
697             # a RES_ERROR-message to the caller if a non-restartable
698             # perl syscall is interrupted.
699             # -> for the async worker mode this should mostly work.
700             # (sysreadfull, rpc print & IO::Select are restartable).
701             # -> stop will just wait for a gracetime of 2 seconds
702             # for active jobs to complete. The remaining jobs
703             # are terminated.
704             #
705             sub stop {
706 0     0 0 0 my ($self, $opts) = @_;
707 0 0       0 $self->{gracetime} = $opts->{gracetime} ? $opts->{gracetime} : 2;
708 0         0 $self->{stop} = 'pending';
709 0         0 return;
710             }
711              
712             # The perl object destroy order is undefined, so $self->{sock}
713             # might already be destroyed and it makes no sense to try to
714             # send RES_ERROR messages for remaining childs.
715             # see: https://perldoc.perl.org/perlobj#Global-Destruction
716             #
717             # So just terminate remaining childs, and let init-process
718             # collect them instead of calling waitpid() here.
719             #
720             # TODO: perl will call DESTROY only when the process exits
721             # cleanly or calls exit(). If the process is killed, perl
722             # calls DESTROY only if a handler for the matching signal
723             # is installed, like: $SIG{'TERM'} = sub { exit; };
724             #
725             # -> so does it make sense to support DESTROY at all,
726             # if there are situations when it is not called?
727             #
728             sub DESTROY {
729 196     196   3300 my ($self) = @_;
730 196 100       1999 $self->{async}->childs_kill() if $self->{async}; # don't wait here
731             }
732              
733             1;
734              
735             __END__