File Coverage

lib/RPC/Switch/Client/Tiny.pm
Criterion Covered Total %
statement 363 453 80.1
branch 188 292 64.3
condition 32 69 46.3
subroutine 38 44 86.3
pod 5 22 22.7
total 626 880 71.1


line stmt bran cond sub pod time code
1             # Lightweight client for the RPC-Switch json-rpc request multiplexer
2             #
3             # see: RPC::Switch: https://github.com/a6502/rpc-switch
4             # RPC::Switch::Client: https://metacpan.org/pod/RPC::Switch::Client
5             #
6             package RPC::Switch::Client::Tiny;
7              
8 20     20   2165687 use strict;
  20         217  
  20         561  
9 20     20   100 use warnings;
  20         41  
  20         520  
10 20     20   99 use Carp 'croak';
  20         40  
  20         999  
11 20     20   858 use JSON;
  20         8171  
  20         100  
12 20     20   11581 use IO::Select;
  20         35863  
  20         1567  
13 20     20   16800 use IO::Socket::SSL;
  20         1259498  
  20         179  
14 20     20   13940 use Time::HiRes qw(time);
  20         25919  
  20         105  
15 20     20   12689 use RPC::Switch::Client::Tiny::Error;
  20         41  
  20         629  
16 20     20   7719 use RPC::Switch::Client::Tiny::Netstring;
  20         40  
  20         1188  
17 20     20   8005 use RPC::Switch::Client::Tiny::Async;
  20         59  
  20         704  
18 20     20   8307 use RPC::Switch::Client::Tiny::SessionCache;
  20         40  
  20         112304  
19              
20             our $VERSION = '1.66';
21              
22             sub new {
23 234     234 1 421262 my ($class, %args) = @_;
24 234 50       1822 my $s = $args{sock} or croak __PACKAGE__ . " expects sock";
25 234 50       1313 unless ($^O eq 'MSWin32') { # cpantester: strawberry perl does not support blocking() call
26 234 50       1333 defined(my $b = $s->blocking()) or croak __PACKAGE__ . " bad socket: $!";
27 234 100       3134 unless ($b) { croak __PACKAGE__ . " nonblocking socket not supported"; }
  19         3857  
28             }
29 215 50       646 unless (exists $args{who}) { croak __PACKAGE__ . " expects who"; }
  0         0  
30 215         3875 my $self = bless {
31             %args,
32             id => 1, # next request id
33             state => '', # last rpcswitch.type
34             reqs => {}, # outstanding requests
35             channels => {}, # open rpcswitch channels
36             methods => {}, # defined worker methods
37             announced => {}, # announced worker methods
38             msglimit => 999999, # max netstring size
39             }, $class;
40 215 50       1580 if (ref($self->{sock}) eq 'IO::Socket::SSL') {
41 0 0       0 $self->{auth_method} = 'clientcert' unless exists $self->{auth_method};
42 0 0       0 $self->{token} = $self->{who} unless exists $self->{token}; # should be optional for clientcert
43             } else {
44 215 50       1085 $self->{auth_method} = 'password' unless exists $self->{auth_method};
45             }
46 215 100       1120 $self->{json_utf8} = $self->{client_encoding_utf8} ? {} : {utf8 => 1};
47 215         1050 return $self;
48             }
49              
50             sub rpc_error {
51 76     76 0 1349 return RPC::Switch::Client::Tiny::Error->new(@_);
52             }
53              
54             sub rpc_send {
55 864     864 0 3545 my ($self, $msg) = @_;
56 864         3627 my $s = $self->{sock};
57 864         3738 my $len = length($s);
58 864 50 33     9832 if ($self->{msglimit} && ($len > $self->{msglimit})) {
59 0         0 warn "rpc_send msglimit exceeded: $len > $self->{msglimit}";
60 0         0 return;
61             }
62 864         4349 $msg->{jsonrpc} = '2.0';
63 864         1592 my $str = to_json($msg, {canonical => 1, %{$self->{json_utf8}}});
  864         5326  
64 864 100       31099 $self->{trace_cb}->('SND', $msg) if $self->{trace_cb};
65 864         12320 return netstring_write($s, $str);
66             }
67              
68             sub rpc_send_req {
69 472     472 0 1593 my ($self, $method, $msg) = @_;
70 472         1902 my $id = "$self->{id}"; $self->{id}++;
  472         982  
71 472         1142 $msg->{id} = $id;
72 472         880 $msg->{method} = $method;
73 472         2619 $self->{reqs}{$id} = $method;
74 472 100       4318 $self->{state} = $method if $method =~ /^rpcswitch\./;
75 472 100       1559 $self->rpc_send($msg) or return;
76 467         1452 return $id;
77             }
78              
79             sub rpc_send_call {
80 176     176 0 1345 my ($self, $method, $params, $reqauth) = @_;
81              
82 176 50       557 if (defined $reqauth) { # request authentication
83             # Without the vcookie the rpcswitch does not validate
84             # the reqauth parameter.
85             # The vcookie 'eatme' value is hardcoded in the rpc-switch
86             # code and is called 'channel information version'.
87             #
88 0         0 return $self->rpc_send_req($method, {params => $params, rpcswitch => {vcookie => 'eatme', reqauth => $reqauth}});
89             } else {
90 176         752 return $self->rpc_send_req($method, {params => $params});
91             }
92             }
93              
94             sub rpc_decode {
95 849     849 0 2070 my ($self, $msg) = @_;
96 849         4744 my ($req, $rsp) = ('', '');
97              
98 849 50 66     6762 unless (($msg->{jsonrpc} eq '2.0') && (exists $msg->{id} || exists $msg->{method})) {
      66        
99 0         0 die rpc_error('jsonrpc', "bad json-rpc: ".to_json($msg, {canonical => 1}));
100             }
101 849 100       5430 if (exists $msg->{method}) {
    50          
    50          
102 420         1060 $req = $msg->{method};
103             } elsif (!defined $msg->{id}) {
104 0         0 die rpc_error('jsonrpc', "bad response id: ".to_json($msg, {canonical => 1}));
105             } elsif (!exists $self->{reqs}{$msg->{id}}) {
106 0         0 die rpc_error('jsonrpc', "unknown response $msg->{id}: ".to_json($msg, {canonical => 1}));
107             } else {
108 429         1403 $rsp = delete $self->{reqs}{$msg->{id}};
109              
110 429 50       1567 if (exists $msg->{error}) {
111 0         0 die rpc_error('jsonrpc', "$rsp $msg->{id} response error: $msg->{error}{message}", {code => $msg->{error}{code}});
112             }
113 429 50       1124 if (!exists $msg->{result}) {
114 0         0 die rpc_error('rpcswitch', "$rsp $msg->{id} response error: result missing");
115             }
116 429 50 100     1913 if ((ref($msg->{result}) ne 'ARRAY') && ($rsp ne 'rpcswitch.ping') && ($rsp ne 'rpcswitch.withdraw')) {
      66        
117 0         0 die rpc_error('rpcswitch', "$rsp $msg->{id} bad response: $msg->{result}");
118             }
119             }
120 849         4103 return ($req, $rsp);
121             }
122              
123             sub rpc_worker_announce {
124 159     159 0 478 my ($self, $workername) = @_;
125              
126             # ignore repeated announce request or unfinished withdraw
127             #
128 159 100       240 return if (keys %{$self->{announced}});
  159         684  
129              
130 130         259 foreach my $method (keys %{$self->{methods}}) {
  130         594  
131 145 100       501 next if exists $self->{methods}{$method}{id}; # active announce/withdraw request
132              
133 135         885 my $params = {method => $method, workername => $workername, doc => $self->{methods}{$method}{doc}};
134 135 50       587 $params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter};
135 135         504 my $id = $self->rpc_send_req('rpcswitch.announce', {params => $params});
136 135 50       701 die rpc_error('io', 'netstring_write') unless defined $id;
137 135         705 $self->{methods}{$method}{id} = $id;
138             }
139 130         341 return;
140             }
141              
142             sub rpc_worker_withdraw {
143 31     31 0 110 my ($self) = @_;
144              
145             # callers will get code -32006 'opposite end of channel gone'
146             # errors when the announcement is withdrawn.
147             #
148 31         54 foreach my $method (keys %{$self->{announced}}) {
  31         262  
149 16 100       176 next if exists $self->{methods}{$method}{id}; # active announce/withdraw request
150              
151 8         40 my $params = {method => $method};
152 8 50       176 $params->{filter} = $self->{methods}{$method}{filter} if exists $self->{methods}{$method}{filter};
153 8         168 my $id = $self->rpc_send_req('rpcswitch.withdraw', {params => $params});
154 8 50       88 die rpc_error('io', 'netstring_write') unless defined $id;
155 8         192 $self->{methods}{$method}{id} = $id;
156             }
157 31         117 return;
158             }
159              
160             sub rpc_worker_flowcontrol {
161 584     584 0 2578 my ($self, $workername) = @_;
162              
163             # need to be in connected auth state
164 584 100 100     5023 return unless ($self->{state} && ($self->{state} ne 'rpcswitch.hello'));
165              
166 452 100       1356 if ($self->{flowcontrol}) {
167 102         260 my $cnt = (scalar keys %{$self->{async}{jobs}}) + (scalar @{$self->{async}{jobqueue}});
  102         354  
  102         240  
168             #printf ">> flow: %d %d %d\n", $cnt, $self->{async}{max_async} * 2, $self->{async}{max_async};
169 102 100       472 if ($cnt >= $self->{async}{max_async} * 2) {
    100          
170 31         229 $self->rpc_worker_withdraw();
171             } elsif ($cnt < $self->{async}{max_async}) {
172 44         236 $self->rpc_worker_announce($workername);
173             }
174             }
175 452         947 return;
176             }
177              
178             sub valid_worker_err {
179 19     19 0 114 my ($err) = @_;
180 19 50       532 $err = {text => $err} unless ref($err); # convert plain errors
181 19 50       228 $err->{class} = 'hard' unless exists $err->{class};
182 19         57 return $err;
183             }
184              
185             sub rpcswitch_resp {
186 244     244 0 702 my ($rpcswitch) = @_;
187              
188             # Just the vcookie & vci-channel parameters are required by
189             # the rpcswitch. The worker_id field is optional, and might
190             # be set to the worker_id returned by the announce response.
191             #
192 244         923 $rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}};
193             #$rpcswitch = {vcookie => $rpcswitch->{vcookie}, vci => $rpcswitch->{vci}, worker_id => $rpcswitch->{worker_id}};
194 244         558 return $rpcswitch;
195             }
196              
197             sub client {
198 228     228 1 2071 my ($self, $msg, $method, $params, $reqauth) = @_;
199 228         798 my ($req, $rsp) = $self->rpc_decode($msg);
200              
201 228 100       1520 if ($req eq 'rpcswitch.greetings') {
    100          
    100          
    100          
    50          
    50          
202 38 50       380 my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert
203 38         228 my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}};
204 38         304 $self->rpc_send_req('rpcswitch.hello', {params => $helloparams});
205             } elsif ($rsp eq 'rpcswitch.hello') {
206 19 50       1596 if (!$msg->{result}[0]) {
207 0         0 die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]");
208             }
209 19         703 $self->rpc_send_call($method, $params, $reqauth);
210             } elsif ($rsp eq 'rpcswitch.ping') {
211 19         76 return [$msg->{result}]; # ping complete
212             } elsif ($rsp eq $method) {
213 133 50       760 if (exists $msg->{rpcswitch}) { # internal rpcswitch methods have no channel
214 133         703 $self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone
215             }
216 133 100       1406 if ($msg->{result}[0] eq 'RES_WAIT') { # async worker notification (might use trace_cb to dump)
    100          
    50          
217 19         361 $self->{channels}{$msg->{rpcswitch}{vci}} = $msg->{result}[1]; # msg id
218             } elsif ($msg->{result}[0] eq 'RES_ERROR') { # worker error
219 19         380 my $e = valid_worker_err($msg->{result}[1]);
220 19         285 die rpc_error('worker', to_json($e), $e);
221             } elsif ($msg->{result}[0] eq 'RES_OK') {
222 95         722 return [@{$msg->{result}}[1..$#{$msg->{result}}]]; # client result[1..$]
  95         551  
  95         266  
223             }
224             } elsif ($req eq 'rpcswitch.result') {
225 0         0 my $channel = $msg->{rpcswitch}{vci};
226 0 0 0     0 if (($msg->{params}[0] eq 'RES_OK') && ($msg->{params}[1] eq $self->{channels}{$channel})) {
    0 0        
227 0         0 return [@{$msg->{params}}[2..$#{$msg->{params}}]]; # client result[2..$] (notification)
  0         0  
  0         0  
228             } elsif (($msg->{params}[0] eq 'RES_ERROR') && ($msg->{params}[1] eq $self->{channels}{$channel})) {
229 0         0 my $e = valid_worker_err($msg->{params}[2]);
230 0         0 die rpc_error('worker', to_json($e), $e);
231             }
232 0         0 die rpc_error('rpcswitch', "bad msg: $msg->{params}[0] $msg->{params}[1]");
233             } elsif ($req eq 'rpcswitch.channel_gone') {
234 19         209 my $channel = $msg->{params}{channel};
235 19 50       114 if (exists $self->{channels}{$channel}) {
236 19         95 my $id = delete $self->{channels}{$channel};
237 19         133 die rpc_error('rpcswitch', "$req for request $id: $channel");
238             }
239 0         0 die rpc_error('rpcswitch', "$req for unknown request: $channel");
240             } else {
241 0         0 die rpc_error('rpcswitch', "unsupported msg: ".to_json($msg, {canonical => 1}));
242             }
243 76         456 return;
244             }
245              
246             sub worker {
247 621     621 1 1814 my ($self, $msg, $workername) = @_;
248 621         2880 my ($req, $rsp) = $self->rpc_decode($msg);
249              
250 621 100       3115 if ($req eq 'rpcswitch.greetings') {
    100          
    100          
    50          
    100          
    50          
    0          
251 115 50       571 my %token = $self->{token} ? (token => $self->{token}) : (); # should be optional for clientcert
252 115         520 my $helloparams = {who => $self->{who}, %token, method => $self->{auth_method}};
253 115         499 $self->rpc_send_req('rpcswitch.hello', {params => $helloparams});
254             } elsif ($rsp eq 'rpcswitch.hello') {
255 115 50       1199 if (!$msg->{result}[0]) {
256 0         0 die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]");
257             }
258 115         2081 $self->rpc_worker_announce($workername);
259             } elsif ($rsp eq 'rpcswitch.announce') {
260 135 50       686 if (!$msg->{result}[0]) {
261 0         0 die rpc_error('rpcswitch', "$rsp failed: $msg->{result}[1]");
262             }
263 135 100       1381 my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}};
  165         1622  
  135         1138  
264 135 50       571 if (!defined $method) {
265 0         0 die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}[1]");
266             }
267             # register announced method
268 135         477 $self->{announced}{$method}{cb} = $self->{methods}{$method}{cb};
269 135         389 $self->{announced}{$method}{worker_id} = $msg->{result}[1]{worker_id};
270 135         351 delete $self->{methods}{$method}{id};
271             } elsif ($req eq 'rpcswitch.ping') {
272 0         0 $self->rpc_send({id => $msg->{id}, result => 'pong!'});
273             } elsif (exists $self->{announced}{$req}) {
274 248         625 $msg->{rpcswitch}{worker_id} = $self->{announced}{$req}{worker_id}; # save worker_id for response
275              
276 248         654 $self->{channels}{$msg->{rpcswitch}{vci}} = 0; # wait for channel_gone
277              
278 248 100       680 if ($self->{async}) { # use async call for forked childs
279 199         1365 $self->{async}->msg_enqueue($msg);
280             } else {
281 49         170 my $rpcswitch = rpcswitch_resp($msg->{rpcswitch});
282 49         326 my @resp = eval { $self->{announced}{$req}{cb}->($msg->{params}, $msg->{rpcswitch}) };
  49         310  
283 49 100       1210 if ($@) {
284 15         705 $self->rpc_send({id => $msg->{id}, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch});
285             } else {
286 34         503 $self->rpc_send({id => $msg->{id}, result => ['RES_OK', @resp], rpcswitch => $rpcswitch});
287             }
288             }
289             } elsif ($rsp eq 'rpcswitch.withdraw') {
290             # Note: the rpcswitch sends just a boolean result here
291             #
292 8 50       72 if (!$msg->{result}) {
293 0         0 die rpc_error('rpcswitch', "$rsp failed: $msg->{result}");
294             }
295 8 50       24 my ($method) = grep { exists $self->{methods}{$_}{id} && $self->{methods}{$_}{id} eq $msg->{id} } keys %{$self->{methods}};
  8         352  
  8         32  
296 8 50       48 if (!defined $method) {
297 0         0 die rpc_error('rpcswitch', "unknown $rsp response $msg->{id}: $msg->{result}");
298             }
299             # remove announced method
300 8         312 delete $self->{announced}{$method};
301 8         96 delete $self->{methods}{$method}{id};
302             } elsif ($req eq 'rpcswitch.channel_gone') {
303 0         0 my $channel = $msg->{params}{channel};
304 0 0       0 if ($self->{async}) {
305 0     0   0 my ($childs, $msgs) = $self->{async}->jobs_terminate('gone', sub { $_[0]->{rpcswitch}{vci} eq $channel });
  0         0  
306 0 0       0 if (@$msgs) {
307 0         0 warn "worker removed queued messages on channel gone: ".join(' ', map { $_->{id} } @$msgs);
  0         0  
308             }
309             }
310 0 0       0 if (exists $self->{channels}{$channel}) {
311 0         0 delete $self->{channels}{$channel};
312             } else {
313 0         0 warn "worker $req for unknown request: $channel";
314             }
315             } else {
316 0         0 warn "worker unsupported msg: ".to_json($msg, {canonical => 1});
317             }
318 621         2425 return;
319             }
320              
321             sub is_session_req {
322 34     34 0 128 my ($self, $params) = @_;
323 34 100       319 return unless $self->{sessioncache};
324              
325 20 50 33     251 if (exists $params->{session} && exists $params->{session}{id}) {
326 20         97 return $params->{session};
327             }
328 0         0 return;
329             }
330              
331             sub is_session_resp {
332 34     34 0 395 my ($self, $params) = @_;
333 34 100       1027 return unless $self->{sessioncache};
334              
335 20 50 33     512 if ((ref($params) eq 'ARRAY') && ($params->[0] eq 'RES_OK') && ref($params->[2]) && exists $params->[2]->{set_session}) {
      33        
      33        
336 20         152 return $params->[2]->{set_session};
337             }
338 0         0 return;
339             }
340              
341             sub child_handler {
342 18     18 0 264 my ($self, $wr) = @_;
343              
344             # The child has to explicitly close the ssl-socket without shutdown.
345             # Otherwise the parent will get an EOF.
346             # see: https://metacpan.org/pod/IO::Socket::SSL#Common-Usage-Errors
347             #
348 18 50       555 if (ref($self->{sock}) eq 'IO::Socket::SSL') {
349 0         0 $self->{sock}->close(SSL_no_shutdown => 1);
350             } else {
351 18         690 close($self->{sock});
352             }
353 18         219 $self->{sock} = $wr;
354 18         137 delete $self->{trace_cb};
355 18         687 local $SIG{INT} = 'DEFAULT';
356 18         542 local $SIG{PIPE} = 'IGNORE'; # handle sigpipe via print/write result
357              
358             # When session handling is enabled a child might process
359             # more than one request with the same session_id.
360             #
361 18         668 while (1) {
362 24         345 my $b = eval { netstring_read($self->{sock}) };
  24         1230  
363 24 100       228 unless ($b) {
364 4 50 33     64 next if ($@ && ($@ =~ /^EINTR/)); # interrupted
365 4 50       45 die "worker child: $@" if $@;
366 4         11 last; # EOF
367             }
368 20         425 my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) };
  20         202  
  20         864  
369 20 50       3291 die "worker child: $@" if $@;
370              
371             # The client catches all possible die() calls, so that it is
372             # guaranteed to call exit either from here or from a signal handler.
373             #
374 20         270 my $params;
375 20         318 my $callback = $self->{methods}{$msg->{method}}{cb};
376 20         120 my @resp;
377 20         165 eval {
378 20         483 local $SIG{PIPE} = 'DEFAULT'; # reenable sigpipe for worker code
379 20         579 @resp = $callback->($msg->{params}, $msg->{rpcswitch});
380             };
381 20 50       904196 if (my $err = $@) {
382 0         0 $params = ['RES_ERROR', $msg->{id}, $err];
383             } else {
384 20         183 $params = ['RES_OK', $msg->{id}, @resp];
385             }
386 20         214 $b = eval { to_json($params, {%{$self->{json_utf8}}}) };
  20         435  
  20         355  
387 20 50       1348 return 1 if $@; # signal die from json encode
388 20         353 my $res = netstring_write($self->{sock}, $b);
389 20 50       149 return 2 unless $res; # signal socket error
390              
391 20 100 66     140 last unless $self->is_session_resp($params) || $self->is_session_req($msg->{params});
392             }
393 18 100       595 close($self->{sock}) or return 3; # signal errors like broken pipe
394 17         469 return 0;
395             }
396              
397             sub _worker_child_write {
398 177     177   769 my ($self, $child, $msg) = @_;
399              
400 177         447 my $b = to_json($msg, {canonical => 1, %{$self->{json_utf8}}});
  177         7210  
401 177         21890 my $res = netstring_write($child->{reader}, $b); # forward request to worker child
402 177 50       777 die rpc_error('io', 'netstring_write') unless $res;
403 177         689 return;
404             }
405              
406             sub _worker_child_get {
407 195     195   408 my ($self, $msg) = @_;
408              
409             # First try to reuse child for existing session
410             #
411 195 100       571 if (my $sessioncache = $self->{sessioncache}) {
412 20 50       106 if (my $session_req = $self->is_session_req($msg->{params})) {
    0          
413 20 100       173 if (my $child = $sessioncache->session_get($session_req->{id}, $msg->{id}, $msg->{rpcswitch}{vci})) {
414 6         18 return $child;
415             }
416             } elsif ($sessioncache->{session_persist_user}) {
417 0 0       0 if (exists $msg->{params}{$sessioncache->{session_persist_user}}) {
418 0         0 my $user = $msg->{params}{$sessioncache->{session_persist_user}};
419 0 0       0 if (my $child = $sessioncache->session_get_per_user($user, $msg->{id}, $msg->{rpcswitch}{vci})) {
420 0         0 delete $child->{session}; # reused session will be added after session_resp
421 0         0 return $child;
422             }
423             }
424             }
425             }
426 189         740 my $child = $self->{async}->child_start($self, $msg->{id}, $msg->{rpcswitch}{vci});
427 171         4868 return $child;
428             }
429              
430             sub _worker_childs_dequeue_and_run {
431 602     602   1175 my ($self) = @_;
432              
433 602         2305 while (my $msg = $self->{async}->msg_dequeue()) {
434 195         504 my $id = $msg->{id};
435 195         531 my $rpcswitch_resp = rpcswitch_resp($msg->{rpcswitch});
436              
437 195         320 my $child = eval { $self->_worker_child_get($msg) };
  195         601  
438 177 50       840 if ($@) {
439 0         0 $self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp});
440             } else {
441 177         405 eval { $self->_worker_child_write($child, $msg) };
  177         2372  
442 177 50       759 if ($@) {
443 0         0 $self->rpc_send({id => $id, result => ['RES_ERROR', $@], rpcswitch => $rpcswitch_resp});
444 0         0 $self->{async}->child_finish($child, 'error');
445             } else {
446 177         10004 $self->rpc_send({id => $id, result => ['RES_WAIT', $id], rpcswitch => $rpcswitch_resp});
447 177         3304 $self->{async}->job_add($child, $msg->{id}, {rpcswitch => $rpcswitch_resp});
448             }
449             }
450             }
451 584         1101 return;
452             }
453              
454             sub _worker_child_read_and_finish {
455 166     166   681 my ($self, $child) = @_;
456              
457 166         532 my $res;
458 166         320 my $b = eval { netstring_read($child->{reader}) };
  166         1357  
459 166 100       586 unless ($b) {
460 14 50       280 my $err = $@ ? $@ : 'EOF';
461 14         448 $res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}});
462 14         966 $self->{async}->child_finish($child, 'error');
463             } else {
464 152         275 my $params = eval { from_json($b, {%{$self->{json_utf8}}}) };
  152         281  
  152         1193  
465 152 50       7864 if ($@) {
466 0         0 $res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $@], rpcswitch => $child->{rpcswitch}});
467 0         0 $self->{async}->child_finish($child, 'error');
468             } else {
469 152         1869 $res = $self->rpc_send({method => 'rpcswitch.result', params => $params, rpcswitch => $child->{rpcswitch}});
470 152 50       790 unless ($res) {
471 0         0 my $err = "result msg limit exceeded: " . length($b);
472 0         0 $res = $self->rpc_send({method => 'rpcswitch.result', params => ['RES_ERROR', $child->{id}, $err], rpcswitch => $child->{rpcswitch}});
473 0         0 $self->{async}->child_finish($child, 'error');
474 0         0 return $res;
475             }
476              
477 152 100       556 if (my $sessioncache = $self->{sessioncache}) {
478 14 50       91 if (my $set_session = $self->is_session_resp($params)) {
479 14 100 66     126 if (!exists $child->{session} || ($child->{session}{id} ne $set_session->{id})) {
480 8         77 $child->{session} = $sessioncache->session_new($set_session);
481 8         89 $sessioncache->expire_insert($child->{session});
482             }
483             }
484              
485 14 100       56 if ($sessioncache->session_put($child)) {
    50          
486 9         27 my $cnt = scalar keys %{$sessioncache->{active}};
  9         30  
487 9 50       24 if ($cnt > $sessioncache->{max_session}) {
488 0 0       0 if ($child = $sessioncache->lru_dequeue()) {
489 0         0 $self->{async}->child_finish($child, 'lru');
490             }
491             }
492 9         21 $child = undef;
493             } elsif (my $idle_child = $sessioncache->session_get_per_user_idle($child)) {
494             # update idle user session with older session_id
495             #
496 0         0 $self->{async}->child_finish($idle_child, 'update');
497              
498 0 0       0 if ($sessioncache->session_put($child)) {
499 0         0 $child = undef;
500             }
501             }
502             }
503 152 100       511 if ($child) {
504 143         558 $self->{async}->child_finish($child, 'done');
505             }
506             }
507             }
508 166         1011 return $res;
509             }
510              
511             sub _worker_sessions_expire {
512 602     602   1216 my ($self) = @_;
513 602 100       1588 return unless $self->{sessioncache};
514              
515             # If a job for the expired session is active, the session
516             # will be dropped when session_put() is called after the
517             # job completed.
518             #
519 60         241 while (my $child = $self->{sessioncache}->expired_dequeue()) {
520 0         0 $self->{async}->child_finish($child, 'expired');
521             }
522 60         113 return;
523             }
524              
525             sub rpc_timeout {
526 1091     1091 0 2353 my ($self, $call_timeout) = @_;
527              
528 1091 100 66     3293 if ($call_timeout && (keys %{$self->{reqs}} > 0)) {
  19         285  
529 19         190 return $call_timeout; # for individual client call
530             }
531 1072         2207 return $self->{timeout};
532             }
533              
534             sub rpc_stopped {
535 1109     1109 0 3284 my ($self) = @_;
536              
537 1109 50       4598 if ($self->{stop}) {
538 0 0 0     0 if (($self->{stop} eq 'withdraw') && (keys %{$self->{announced}})) {
  0 0 0     0  
      0        
539 0         0 return; # wait for withdraw to complete
540 0         0 } elsif (($self->{stop} eq 'withdraw') && $self->{async} && (keys %{$self->{async}{jobs}})) {
541 0         0 return; # wait for active jobs to complete
542             }
543 0         0 return 1;
544             }
545             }
546              
547             sub rpc_handler {
548 305     305 0 1188 my ($self, $call_timeout, $handler, @handler_params) = @_;
549              
550             # returns response or throws rpc_error.
551             # returns undef when remote side cleanly closed connection with EOF.
552             #
553 305         947 while (!$self->rpc_stopped()) {
554 1109         2457 my @pipes = ();
555 1109 100       2742 if ($self->{async}) {
556 602         2139 $self->_worker_sessions_expire();
557 602 50       3799 $self->_worker_childs_dequeue_and_run() unless $self->{stop};
558 584         5159 $self->{async}->childs_reap(nonblock => 1);
559 584         4484 $self->rpc_worker_flowcontrol(@handler_params);
560 584         957 @pipes = map { $_->{reader} } values %{$self->{async}{jobs}};
  428         1525  
  584         3674  
561             }
562 1091         5442 my $timeout = $self->rpc_timeout($call_timeout);
563              
564 1091 100 100     5990 if ($timeout || @pipes) {
565 440         9262 my @ready = IO::Select->new(($self->{sock}, @pipes))->can_read($timeout);
566 440 50 66     13615112 next if (@ready == 0) && $!{EINTR}; # $! is not reset on success
567 440 100       2144 die rpc_error('jsonrpc', 'receive timeout') unless (@ready > 0);
568              
569 421         2248 foreach my $fh (@ready) {
570 461 50 66     4488 if (($fh != $self->{sock}) && $self->{async}) {
571 166 50       1051 unless (exists $self->{async}{jobs}{$fh->fileno}) {
572 0         0 die rpc_error('io', "child pipe not found: ". $fh->fileno);
573             }
574 166         1690 my $child = $self->{async}{jobs}{$fh->fileno};
575 166         1373 $self->{async}->job_rem($child);
576 166         2912 my $res = $self->_worker_child_read_and_finish($child);
577             }
578             }
579 421 100       1022 next unless grep { $_ == $self->{sock} } @ready;
  461         3819  
580             }
581              
582             # always block on full messages from rpcswitch
583 946         1789 my $b = eval { netstring_read($self->{sock}) };
  946         3591  
584 946 100       3475 unless ($b) {
585 97 50 33     420 next if ($@ && ($@ =~ /^EINTR/)); # check if stopped
586 97 50       242 die rpc_error('io', $@) if $@;
587 97         297 return; # EOF
588             }
589 849         1359 my $msg = eval { from_json($b, {%{$self->{json_utf8}}}) };
  849         1419  
  849         6132  
590 849 100       33228 die rpc_error('jsonrpc', $@) if $@;
591 830 100       2471 $self->{trace_cb}->('RCV', $msg) if $self->{trace_cb};
592 830         8028 my $res = eval { $handler->($self, $msg, @handler_params) };
  830         3564  
593 830 100       2503 if (my $err = $@) {
594 38 50       1501 die $err if ref($err); # forward error
595 0         0 die rpc_error('io', $err);
596             }
597 792 100       3823 if ($res) {
598 114         684 return $res;
599             }
600             }
601 0         0 return; # STOP is checked by caller
602             }
603              
604             sub work {
605 134     134 1 3339 my ($self, $workername, $methods, $opts) = @_;
606              
607             # a write on a shutdown socket should never happen
608             #
609 134     0   4014 local $SIG{'PIPE'} = sub { die "work[$$]: got PIPE!\n" };
  0         0  
610              
611 134         913 foreach my $method (keys %$methods) {
612 130         558 $self->{methods}{$method}{cb} = $methods->{$method}{cb};
613 130 100       612 $self->{methods}{$method}{doc} = (defined $methods->{$method}{doc}) ? $methods->{$method}{doc} : {};
614 130 50       613 $self->{methods}{$method}{filter} = $methods->{$method}{filter} if exists $methods->{$method}{filter};
615             }
616 134 100       578 $opts->{trace_cb} = $self->{trace_cb} if exists $self->{trace_cb};
617 134 100       2238 $self->{async} = RPC::Switch::Client::Tiny::Async->new(%$opts) if $opts->{max_async};
618 134 50       644 $self->{flowcontrol} = $opts->{flowcontrol} if $opts->{flowcontrol};
619 134 100       708 $self->{sessioncache} = RPC::Switch::Client::Tiny::SessionCache->new(%$opts) if $opts->{max_session};
620 134         792 $self->rpc_handler(0, \&worker, $workername);
621              
622 97 50       724 if ($self->{stop}) {
623 0         0 $self->rpc_worker_withdraw();
624 0         0 $self->{stop} = 'withdraw';
625              
626             # wait some time for withdraw & active jobs to complete
627             #
628 0     0   0 local $SIG{ALRM} = sub { warn "worker child stop timeout\n"; $self->{stop} = 'timeout'; };
  0         0  
  0         0  
629 0         0 alarm($self->{gracetime});
630 0         0 $self->rpc_handler(0, \&worker, $workername);
631 0         0 alarm(0);
632 0         0 die rpc_error('io', 'STOP');
633             }
634 97 100       382 if (my $async = $self->{async}) {
635             # drop stored sessions
636             #
637 48 100       507 if (my $sessioncache = $self->{sessioncache}) {
638 4         12 foreach my $session_id (keys %{$sessioncache->{active}}) {
  4         47  
639 3 50       33 if (my $child = $sessioncache->session_get($session_id)) {
640 3         27 $async->child_finish($child, 'idle');
641             }
642             }
643             }
644             # reap remaining childs
645             #
646 48 50       172 if (keys %{$async->{finished}}) {
  48         291  
647 48     0   2840 local $SIG{ALRM} = sub { warn "worker child wait timeout\n" };
  0         0  
648 48         700 alarm(1); # wait at most for one second
649 48 50       344 unless ($async->childs_reap()) { # blocking
650 0         0 $async->childs_reap(nonblock => 1); # continue nonblocking after timeout
651             }
652 48         1987 alarm(0);
653             }
654              
655             # EOF is only an error here when there are outstanding requests
656             #
657 48     0   1002 my ($stopped, $msgs) = $async->jobs_terminate('stopped', sub { 1 });
  0         0  
658 48         236 my @childs = keys %{$async->{finished}};
  48         198  
659              
660 48         448 $async->childs_kill(); # don't wait here
661              
662 48         337 $async->{jobqueue} = [];
663 48         241 $async->{finished} = {};
664              
665 48 50       199 die rpc_error('io', 'eof while jobs active: '.join(' ', @childs)) if (@childs);
666 48 50       245 die rpc_error('io', 'eof while jobs queued: '.join(' ', @$msgs)) if (@$msgs);
667             }
668 97         3217 return;
669             }
670              
671             sub call {
672 171     171 1 181868 my ($self, $method, $params, $opts) = @_;
673 171         456 my $reqauth = $opts->{reqauth};
674 171         323 my $call_timeout = $opts->{timeout};
675              
676 171 100       608 if ($self->{state} eq 'rpcswitch.hello') { # trigger rpc_send for consecutive requests
677 133         779 $self->rpc_send_call($method, $params, $reqauth);
678             }
679             # EOF is an error here (response missing)
680             #
681 171 50       1254 my $res = $self->rpc_handler($call_timeout, \&client, $method, $params, $reqauth) or die rpc_error('io', 'eof');
682 114 100       779 return wantarray() ? @$res : $res->[0];
683             }
684              
685             # stop() exits an active $client->work() worker handler.
686             #
687             # - work() dies with RPC::Switch::Client::Tiny::Error which
688             # might be {type => 'io', message => 'STOP'}, or any other
689             # error if a non-restartable system call was interrupted.
690             #
691             # - stop() makes no sense for call() (it has rpc_timeout)
692             # - the only way to call stop is from a signal handler.
693             # - if a signal handler is called, non-restartavle perl
694             # system call are interrupted and return $! == EINTR.
695             #
696             # -> this can break an active worker handler and result in
697             # a RES_ERROR-message to the caller if a non-restartable
698             # perl syscall is interrupted.
699             # -> for the async worker mode this should mostly work.
700             # (sysreadfull, rpc print & IO::Select are restartable).
701             # -> stop will just wait for a gracetime of 2 seconds
702             # for active jobs to complete. The remaining jobs
703             # are terminated.
704             #
705             sub stop {
706 0     0 0 0 my ($self, $opts) = @_;
707 0 0       0 $self->{gracetime} = $opts->{gracetime} ? $opts->{gracetime} : 2;
708 0         0 $self->{stop} = 'pending';
709 0         0 return;
710             }
711              
712             # The perl object destroy order is undefined, so $self->{sock}
713             # might already be destroyed and it makes no sense to try to
714             # send RES_ERROR messages for remaining childs.
715             # see: https://perldoc.perl.org/perlobj#Global-Destruction
716             #
717             # So just terminate remaining childs, and let init-process
718             # collect them instead of calling waitpid() here.
719             #
720             # TODO: perl will call DESTROY only when the process exits
721             # cleanly or calls exit(). If the process is killed, perl
722             # calls DESTROY only if a handler for the matching signal
723             # is installed, like: $SIG{'TERM'} = sub { exit; };
724             #
725             # -> so does it make sense to support DESTROY at all,
726             # if there are situations when it is not called?
727             #
728             sub DESTROY {
729 196     196   3569 my ($self) = @_;
730 196 100       1966 $self->{async}->childs_kill() if $self->{async}; # don't wait here
731             }
732              
733             1;
734              
735             __END__