File Coverage

blib/lib/RPC/Switch/Client.pm
Criterion Covered Total %
statement 61 466 13.0
branch 1 192 0.5
condition 0 80 0.0
subroutine 21 76 27.6
pod 10 15 66.6
total 93 829 11.2


line stmt bran cond sub pod time code
1             package RPC::Switch::Client;
2 1     1   812 use Mojo::Base 'Mojo::EventEmitter';
  1         3  
  1         8  
3              
4             our $VERSION = '0.22'; # VERSION
5              
6             #
7             # Mojo's default reactor uses EV, and EV does not play nice with signals
8             # without some handholding. We either can try to detect EV and do the
9             # handholding, or try to prevent Mojo using EV.
10             #
11             BEGIN {
12 1 50   1   1899 $ENV{'MOJO_REACTOR'} = 'Mojo::Reactor::Poll' unless $ENV{'MOJO_REACTOR'};
13             }
14              
15             # more Mojolicious
16 1     1   539 use Mojo::IOLoop;
  1         160357  
  1         16  
17 1     1   41 use Mojo::IOLoop::Stream;
  1         3  
  1         6  
18 1     1   516 use Mojo::Log;
  1         13309  
  1         9  
19              
20             # standard perl
21 1     1   39 use Carp qw(croak);
  1         2  
  1         48  
22 1     1   6 use Scalar::Util qw(blessed refaddr);
  1         3  
  1         43  
23 1     1   6 use Cwd qw(realpath);
  1         2  
  1         38  
24 1     1   5 use Data::Dumper;
  1         2  
  1         47  
25 1     1   5 use Encode qw(encode_utf8 decode_utf8);
  1         2  
  1         58  
26 1     1   9 use File::Basename;
  1         1  
  1         63  
27 1     1   6 use IO::Handle;
  1         2  
  1         78  
28 1     1   15 use POSIX ();
  1         4  
  1         35  
29 1     1   6 use Scalar::Util qw(blessed refaddr weaken);
  1         3  
  1         57  
30 1     1   699 use Storable;
  1         3391  
  1         57  
31 1     1   466 use Sys::Hostname;
  1         1003  
  1         71  
32              
33             # from cpan
34 1     1   481 use JSON::RPC2::TwoWay 0.07; # for configurable json encoder
  1         4130  
  1         30  
35             # JSON::RPC2::TwoWay depends on JSON::MaybeXS anyways, so it can be used here
36             # without adding another dependency
37 1     1   7 use JSON::MaybeXS qw();
  1         2  
  1         21  
38 1     1   466 use MojoX::NetstringStream 0.06;
  1         1277  
  1         8  
39              
40             # us
41 1     1   505 use RPC::Switch::Client::Steps;
  1         3  
  1         8  
42              
43             has [qw(
44             actions address auth cb_used channels clientid conn debug ioloop
45             json lastping log method ns ping_timeout port rpc timeout tls token
46             who
47             )];
48              
49             # keep in sync with the rpc-switch
50             use constant {
51 1         6765 RES_OK => 'RES_OK',
52             RES_WAIT => 'RES_WAIT',
53             RES_TIMEOUT => 'RES_TIMEOUT',
54             RES_ERROR => 'RES_ERROR',
55             RES_OTHER => 'RES_OTHER', # 'dunno'
56             WORK_OK => 0, # exit codes for work method
57             WORK_PING_TIMEOUT => 92,
58             WORK_CONNECTION_CLOSED => 91,
59 1     1   71 };
  1         2  
60              
61             sub new {
62 0     0 1   my ($class, %args) = @_;
63 0           my $self = $class->SUPER::new();
64              
65 0   0       my $debug = $args{debug} // 0; # or 1?
66              
67 0   0       $self->{address} = $args{address} // '127.0.0.1';
68 0           $self->{cb_used} = {}; # avoid calling cb twice in a timeout scenario
69 0           $self->{channels} = {}; # per channel hash of waitids
70 0           $self->{debug} = $debug;
71 0   0       $self->{json} = $args{json} // 1;
72             $self->{jsonobject} = $args{jsonobject} // JSON::MaybeXS->new(utf8 => 1),
73 0   0       $self->{ping_timeout} = $args{ping_timeout} // 300;
      0        
74 0   0       $self->{ioloop} = $args{ioloop} // Mojo::IOLoop->singleton;
75             $self->{log} = $args{log}
76 0 0 0       // Mojo::Log->new(level => ($debug) ? 'debug' : 'info');
77 0   0       $self->{method} = $args{method} // 'password';
78 0   0       $self->{port} = $args{port} // 6551;
79 0   0       $self->{timeout} = $args{timeout} // 60;
80 0   0       $self->{tls} = $args{tls} // 0;
81 0           $self->{tls_ca} = $args{tls_ca};
82 0           $self->{tls_cert} = $args{tls_cert};
83 0           $self->{tls_key} = $args{tls_key};
84 0 0         $self->{token} = $args{token} or croak 'no token?';
85 0 0         $self->{who} = $args{who} or croak 'no who?';
86 0   0       $self->{autoconnect} = $args{autoconnect} // 1;
87              
88 0 0         return $self unless $self->{autoconnect};
89              
90 0           $self->connect;
91              
92 0 0         return $self if $self->{auth};
93 0           return;
94             }
95              
96             sub connect {
97 0     0 1   my $self = shift;
98              
99 0           delete $self->ioloop->{__exit__};
100 0           delete $self->{auth};
101 0           $self->{actions} = {};
102              
103             $self->on(disconnect => sub {
104 0     0     my ($self, $code) = @_;
105             #$self->{_exit} = $code;
106 0           $self->ioloop->stop;
107 0           });
108              
109 0           my $debug = do {
110 0           weaken(my $self = $self);
111             $self->{debug} ? sub {
112 0 0   0     if ($self->{log}) {
113 0           $self->log->debug(@_)
114             } else {
115 0           warn join(' ', @_)."\n";
116             }
117             } : undef
118 0 0         };
119              
120             my $rpc = JSON::RPC2::TwoWay->new(
121             debug => $debug,
122             json => $self->{jsonobject},
123 0 0         ) or croak 'no rpc?';
124 0     0     $rpc->register('rpcswitch.greetings', sub { $self->rpc_greetings(@_) }, notification => 1);
  0            
125 0     0     $rpc->register('rpcswitch.ping', sub { $self->rpc_ping(@_) });
  0            
126 0     0     $rpc->register('rpcswitch.channel_gone', sub { $self->rpc_channel_gone(@_) }, notification => 1);
  0            
127             $rpc->register(
128             'rpcswitch.result',
129 0     0     sub { $self->rpc_result(@_) },
130 0           by_name => 0,
131             notification => 1,
132             raw => 1
133             );
134              
135             my $clarg = {
136             address => $self->{address},
137             port => $self->{port},
138             tls => $self->{tls},
139 0           };
140 0 0         $clarg->{tls_ca} = $self->{tls_ca} if $self->{tls_ca};
141 0 0         $clarg->{tls_cert} = $self->{tls_cert} if $self->{tls_cert};
142 0 0         $clarg->{tls_key} = $self->{tls_key} if $self->{tls_key};
143              
144             my $clientid = $self->ioloop->client(
145             $clarg => sub {
146 0     0     my ($loop, $err, $stream) = @_;
147 0 0         if ($err) {
148 0           $err =~ s/\n$//s;
149 0           $self->log->error('connection to API failed: ' . $err);
150 0           $self->{auth} = 0;
151 0           return;
152             }
153 0           my $ns = MojoX::NetstringStream->new(stream => $stream);
154 0           $self->{ns} = $ns;
155             my $conn = $rpc->newconnection(
156             owner => $self,
157 0           write => sub { $ns->write(@_) },
158 0           );
159 0           $self->{conn} = $conn;
160             $ns->on(chunk => sub {
161 0           my ($ns2, $chunk) = @_;
162             #say 'got chunk: ', $chunk;
163 0           my @err = $conn->handle($chunk);
164 0 0         $self->log->info('chunk handler: ' . join(' ', grep defined, @err)) if @err;
165 0 0         $ns->close if $err[0];
166 0           });
167             $ns->on(close => sub {
168             # this cb is called during global destruction, at
169             # least on old perls where
170             # Mojo::Util::_global_destruction() won't work
171 0 0         return unless $conn;
172 0           $conn->close;
173 0           $self->log->warn('connection to rpcswitch closed');
174 0           $self->emit(disconnect => WORK_CONNECTION_CLOSED); # todo: doc
175 0           });
176 0           });
177              
178 0           $self->{rpc} = $rpc;
179 0           $self->{clientid} = $clientid;
180              
181             # handle timeout?
182             my $tmr = $self->ioloop->timer($self->{timeout} => sub {
183 0     0     my $loop = shift;
184 0           $self->log->error('timeout wating for greeting');
185 0           $loop->remove($clientid); # disconnect
186 0           $self->{auth} = 0;
187 0           });
188              
189 0           $self->log->debug('starting handshake');
190            
191             # fixme: catch signals?
192 0     0     $self->_loop(sub { not defined $self->{auth} });
  0            
193              
194 0           $self->log->debug('done with handhake?');
195              
196 0           $self->ioloop->remove($tmr);
197 0           $self->unsubscribe('disconnect');
198              
199 0           return $self->{auth};
200             }
201              
202             sub is_connected {
203 0     0 1   my $self = shift;
204 0   0       return $self->{auth} && !$self->ioloop->{__exit__};
205             }
206              
207             sub rpc_greetings {
208 0     0 0   my ($self, $c, $i) = @_;
209             RPC::Switch::Client::Steps->new(ioloop => $self->ioloop)->steps([sub{
210 0     0     my $steps = shift;
211 0 0         die "wrong api version $i->{version} (expected 1.0)" unless $i->{version} eq '1.0';
212 0           $self->log->info('got greeting from ' . $i->{who});
213 0           $c->call(
214             'rpcswitch.hello',
215             {who => $self->who, method => $self->method, token => $self->token},
216             $steps->next,
217             );
218             }, sub {
219 0     0     my ($steps, $e, $r) = @_;
220 0           my $w;
221 0 0         die "hello returned error $e->{message} ($e->{code})" if $e;
222 0 0         die 'no results from hello?' unless $r;
223 0           ($r, $w) = @$r;
224 0 0         if ($r) {
225 0           $self->log->info("hello returned: $r, $w");
226 0           $self->{auth} = 1;
227             } else {
228 0   0       $self->log->error('hello failed: ' . ($w // ''));
229 0           $self->{auth} = 0; # defined but false
230             }
231             }],sub {
232 0     0     my ($err) = @_;
233 0           $self->log->error('something went wrong in handshake: ' . $err);
234 0           $self->{auth} = '';
235 0           });
236             }
237              
238             sub call {
239 0     0 1   my ($self, %args) = @_;
240 0           my ($done, $status, $outargs);
241             $args{waitcb} = sub {
242 0     0     ($status, $outargs) = @_;
243 0 0 0       unless ($status and $status eq RES_WAIT) {
244 0   0       $self->log->error('unexpected status: ' . ($status // 'undef'));
245 0           $done++;
246 0           return;
247             }
248 0           $self->log->debug("gotta wait for $outargs");
249 0           };
250             $args{resultcb} = sub {
251 0     0     ($status, $outargs) = @_;
252 0           $done++;
253 0           };
254 0           $self->call_nb(%args);
255              
256 0     0     $self->_loop(sub { !$done });
  0            
257              
258 0           return $status, $outargs;
259             }
260              
261             sub call_nb {
262 0     0 1   my ($self, %args) = @_;
263 0 0         my $method = $args{method} or die 'no method?';
264 0           my $vtag = $args{vtag};
265 0   0       my $inargs = $args{inargs} // '{}';
266 0           my $waitcb = $args{waitcb}; # optional
267 0   0       my $rescb = $args{resultcb} // die 'no result callback?';
268 0   0       my $timeout = $args{timeout} // 0; # accommodate existing code where this didn't work
269 0           my $reqauth = $args{reqauth};
270 0           my $inargsj;
271              
272 0 0         if ($self->{json}) {
273 0           $inargsj = $inargs;
274 0           $inargs = $self->{jsonobject}->decode($inargs);
275 0 0         croak 'inargs is not a json object' unless ref $inargs eq 'HASH';
276             } else {
277 0 0         croak 'inargs should be a hashref' unless ref $inargs eq 'HASH';
278             # test encoding
279 0           $inargsj = $self->{jsonobject}->encode($inargs);
280 0 0         if ($reqauth) {
281             }
282             }
283              
284 0 0         if ($reqauth) {
285 0 0         if (blessed($reqauth)) {
286 0 0         if ($reqauth->can('_to_reqauth')) {
287             # duck typing in action
288 0           $reqauth = $reqauth->_to_reqauth();
289             } else {
290 0           croak "Don't know how to convert $reqauth to reqauth hash";
291             }
292             }
293 0 0         croak 'reqauth should be a hashref' unless ref $reqauth eq 'HASH';
294             }
295              
296 0           my $req = {
297             rescb => $rescb,
298             };
299              
300 0 0         if ($timeout > 0) {
301             $req->{tmr} = $self->ioloop->timer($timeout => sub {
302 0 0   0     my $rescb = delete $req->{rescb} or return;
303 0           $rescb->(RES_TIMEOUT, "timed out after $timeout seconds");
304 0           });
305             }
306              
307 0           $inargsj = decode_utf8($inargsj);
308 0 0         $self->log->debug("calling $method with '" . $inargsj . "'" . (($vtag) ? " (vtag $vtag)" : ''));
309              
310             RPC::Switch::Client::Steps->new(ioloop => $self->ioloop)->steps([sub{
311 0     0     my $steps = shift;
312 0 0         $self->conn->callraw({
313             method => $method,
314             params => $inargs,
315             ($reqauth ? (rpcswitch => { vcookie => 'eatme', reqauth => $reqauth }) : ()),
316             }, $steps->next );
317             }, sub {
318             #$self->log->debug('got response:' . Dumper(\@_));
319 0     0     my ($steps, $e, $r) = @_;
320 0 0         if ($e) {
321 0           $e = $e->{error};
322 0           $self->log->error("call returned error: $e->{message} ($e->{code})");
323 0 0         $rescb->(RES_ERROR, "$e->{message} ($e->{code})") if $rescb;
324 0           return;
325             }
326 0           my ($rescb, $tmr) = @{$req}{qw(rescb tmr)};
  0            
327 0 0         return unless $rescb; # $rescb is undef if a timeout happeded
328 0           my ($status, $outargs) = @{$r->{result}};
  0            
329 0 0         if ($status eq RES_WAIT) {
330 0           my $vci = $r->{rpcswitch}->{vci};
331 0 0         unless ($vci) {
332 0           $self->log->error("missing rpcswitch vci after RES_WAIT");
333 0           return;
334             }
335              
336             # note the relation to the channel so we can throw an error if
337             # the channel disappears
338             # outargs should contain waitid
339             # autovivification ftw?
340 0           $self->{channels}->{$vci}->{$outargs} = $req;
341 0 0         $waitcb->($status, $outargs) if $waitcb;
342             } else {
343             $outargs = $self->{jsonobject}->encode($outargs)
344 0 0 0       if $self->{json} and ref $outargs;
345 0           $rescb->($status, $outargs);
346 0 0         $self->ioloop->remove($tmr) if $tmr;
347             }
348 0           return;
349             }], sub {
350 0     0     my ($err) = @_;
351 0           $self->log->error("Something went wrong in call_nb: $err");
352 0           my ($rescb, $tmr) = @{$req}{qw(rescb tmr)};
  0            
353 0 0         $rescb->(RES_ERROR, $err) if $rescb;
354 0 0         $self->ioloop->remove($tmr) if $tmr;
355 0           return @_;
356 0           });
357 0           return;
358             }
359              
360             sub get_status {
361 0     0 1   my ($self, $wait_id, $notify) = @_;
362            
363 0           my ($ns, $id) = split /:/, $wait_id, 2;
364            
365 0 0         die "no namespace in waitid?" unless $ns;
366            
367 0 0         my $inargs = {
368             wait_id => $wait_id,
369             notify => ($notify ? JSON->true : JSON->false),
370             };
371             # meh:
372 0 0         $inargs = $self->{jsonobject}->encode($inargs) if $self->{json};
373              
374             # fixme: reuse call?
375 0           my ($done, $status, $outargs);
376             my %args = (
377             method => "$ns._get_status",
378             inargs => $inargs,
379             waitcb => sub {
380 0     0     ($status, $outargs) = @_;
381 0 0 0       die "unexpected status" unless $status and $status eq RES_WAIT;
382 0 0         $done++ unless $notify;
383             },
384             resultcb => sub {
385 0     0     ($status, $outargs) = @_;
386 0           $done++;
387             },
388 0           );
389 0           $self->call_nb(%args);
390              
391 0     0     $self->_loop(sub { !$done });
  0            
392              
393 0           return $status, $outargs;
394             }
395              
396             sub rpc_result {
397 0     0 0   my ($self, $c, $r) = @_;
398             #$self->log->debug('got result: ' . Dumper($r));
399 0           my ($status, $id, $outargs) = @{$r->{params}};
  0            
400 0 0         return unless $id;
401 0           my $vci = $r->{rpcswitch}->{vci};
402 0 0         return unless $vci;
403 0           my $req = delete $self->{channels}->{$vci}->{$id};
404 0 0         return unless $req;
405 0           my ($rescb, $tmr) = @{$req}{qw(rescb tmr)};
  0            
406 0 0         return unless $rescb;
407 0 0         $self->ioloop->remove($tmr) if $tmr;
408             $outargs = $self->{jsonobject}->encode($outargs)
409 0 0 0       if $self->{json} and ref $outargs;
410 0           $rescb->($status, $outargs);
411 0           return;
412             }
413              
414             sub rpc_channel_gone {
415 0     0 0   my ($self, $c, $a) = @_;
416 0           my $ch = $a->{channel};
417 0           $self->log->debug("got channel_gone: $ch");
418 0 0         return unless $ch;
419 0           my $wl = delete $self->{channels}->{$ch};
420 0 0         return unless $wl;
421 0           for (values %$wl) {
422 0           my ($rescb, $tmr) = @{$_}{qw(rescb tmr)};
  0            
423 0 0         $self->ioloop->remove($tmr) if $tmr;
424 0 0         $rescb->(RES_ERROR, 'channel gone') if $rescb;
425             }
426 0           return;
427             }
428              
429             sub ping {
430 0     0 1   my ($self, $timeout) = @_;
431              
432 0   0       $timeout //= $self->timeout;
433 0           my ($done, $ret);
434              
435             $self->ioloop->timer($timeout => sub {
436 0     0     $done++;
437 0           });
438              
439             $self->conn->call('rpcswitch.ping', {}, sub {
440 0     0     my ($e, $r) = @_;
441 0 0 0       if (not $e and $r and $r =~ /pong/) {
      0        
442 0           $ret = 1;
443             } else {
444 0           %$self = ();
445             }
446 0           $done++;
447 0           });
448              
449             # we could recurse here
450 0     0     $self->_loop(sub { !$done });
  0            
451              
452 0           return $ret;
453             }
454              
455             sub work {
456 0     0 1   my ($self, $prepare) = @_;
457              
458 0           my $pt = $self->ping_timeout;
459 0           my $tmr;
460             $tmr = $self->ioloop->recurring($pt => sub {
461 0     0     my $ioloop = shift;
462 0   0       $self->log->debug('in ping_timeout timer: lastping: '
463             . ($self->lastping // 0) . ' limit: ' . (time - $pt) );
464 0 0 0       return if ($self->lastping // 0) > time - $pt;
465 0           $self->log->error('ping timeout');
466 0           $ioloop->remove($self->clientid);
467 0           $ioloop->remove($tmr);
468 0           $ioloop->{__exit__} = WORK_PING_TIMEOUT; # todo: doc
469 0           $ioloop->stop;
470 0 0         }) if $pt > 0;
471             $self->on(disconnect => sub {
472 0     0     my ($self, $code) = @_;
473 0           $self->ioloop->{__exit__} = $code;
474 0           $self->ioloop->stop;
475 0           });
476 0 0         return 0 if $prepare;
477 0           $self->ioloop->{__exit__} = WORK_OK;
478 0           $self->log->debug(blessed($self) . ' starting work');
479 0 0         $self->ioloop->start unless $self->ioloop->is_running;
480 0           $self->log->debug(blessed($self) . ' done?');
481 0 0         $self->ioloop->remove($tmr) if $tmr;
482              
483 0           return $self->ioloop->{__exit__};
484             }
485              
486             sub stop {
487 0     0 1   my ($self, $exit) = @_;
488 0           $self->ioloop->{__exit__} = $exit;
489 0           $self->ioloop->stop;
490             }
491              
492             sub announce {
493 0     0 1   my ($self, %args) = @_;
494 0 0         my $method = $args{method} or croak 'no method?';
495 0 0         my $cb = $args{cb} or croak 'no cb?';
496             #my $async = $args{async} // 0;
497 0 0 0       my $mode = $args{mode} // (($args{async}) ? 'async' : 'sync');
498 0 0         croak "unknown callback mode $mode" unless $mode =~ /^(subproc|async|async2|sync)$/;
499 0           my $host = hostname;
500 0   0       my $workername = $args{workername} // "$self->{who} $host $0 $$";
501            
502 0 0         croak "already have action $method" if $self->actions->{$method};
503            
504 0           my ($done, $err);
505             RPC::Switch::Client::Steps->new(ioloop => $self->ioloop)->steps([
506             sub {
507 0     0     my $steps = shift;
508             # fixme: check results?
509             $self->conn->call('rpcswitch.announce', {
510             workername => $workername,
511             method => $method,
512             (($args{filter}) ? (filter => $args{filter}) : ()),
513 0 0         (($args{doc}) ? (doc => $args{doc}) : ()),
    0          
514             }, $steps->next(),
515             );
516             },
517             sub {
518             #say 'call returned: ', Dumper(\@_);
519 0     0     my ($steps, $e, $r) = @_;
520 0           $done++; # reply received, stop wating
521 0 0         if ($e) {
522 0           $self->log->debug("announce got error " . Dumper($e));
523 0           $err = $e->{message};
524 0           return;
525             }
526 0           my ($res, $msg) = @$r;
527 0 0         unless ($res) {
528 0           $err = $msg;
529 0           $self->log->error("announce got res: $res msg: $msg");
530 0           return;
531             }
532 0           my $worker_id = $msg->{worker_id};
533             my $action = {
534             cb => $cb,
535             mode => $mode,
536             undocb => $args{undocb},
537             meta => $args{meta},
538 0           worker_id => $worker_id,
539             };
540 0           $self->actions->{$method} = $action;
541             $self->rpc->register(
542             $method,
543 0           sub { $self->_magic($action, @_) },
544 0           non_blocking => 1,
545             raw => 1,
546             );
547 0           $self->log->debug("succesfully announced $method");
548             }],sub {
549 0     0     ($err) = @_;
550 0           $done++;
551 0           $self->log->debug("something went wrong with announce: $err");
552 0           });
553              
554 0     0     $self->_loop(sub { !$done });
  0            
555              
556 0           return $err;
557             }
558              
559             sub rpc_ping {
560 0     0 0   my ($self, $c, $i, $rpccb) = @_;
561 0           $self->lastping(time());
562 0           return 'pong!';
563             }
564              
565             sub _magic {
566             #say '_magic: ', Dumper(\@_);
567 0     0     my ($self, $action, $con, $request, $rpccb) = @_;
568 0           my $method = $request->{method};
569 0           my $req_id = $request->{id};
570 0 0         unless ($action) {
571 0           $self->log->info("_magic for unknown action $method");
572 0           return;
573             }
574             my $rpcswitch = $request->{rpcswitch} or
575 0 0         die "no rpcswitch information?";
576 0           $rpcswitch->{worker_id} = $action->{worker_id};
577 0           my $resp = {
578             jsonrpc => '2.0',
579             id => $req_id,
580             rpcswitch => $rpcswitch,
581             };
582             my $cb1 = sub {
583 0     0     $resp->{result} = \@_;
584 0           $rpccb->($resp);
585 0           };
586 0           my @args = ($req_id, $request->{params});
587 0 0         push @args, $rpcswitch if $action->{meta};
588              
589 0           local $@;
590             # fastest to slowest?
591 0 0         if ($action->{mode} eq 'async2') {
    0          
    0          
    0          
592             my $cb2 = sub {
593             my $request = $self->{jsonobject}->encode({
594 0     0     jsonrpc => '2.0',
595             method => 'rpcswitch.result',
596             rpcswitch => $rpcswitch,
597             params => \@_,
598             });
599 0           $con->write($request);
600 0           };
601 0           eval {
602 0           $action->{cb}->(@args, $cb1, $cb2);
603             };
604 0 0         if ($@) {
605 0           $cb1->(RES_ERROR, $@);
606             }
607             } elsif ($action->{mode} eq 'async') {
608             my $cb2 = sub {
609             my $request = $self->{jsonobject}->encode({
610 0     0     jsonrpc => '2.0',
611             method => 'rpcswitch.result',
612             rpcswitch => $rpcswitch,
613             params => [ RES_OK, $req_id, @_ ],
614             });
615 0           $con->write($request);
616 0           };
617 0           eval {
618 0           $action->{cb}->(@args, $cb2);
619             };
620 0 0         if ($@) {
621 0           $cb1->(RES_ERROR, $@);
622             } else {
623 0           $cb1->(RES_WAIT, $req_id);
624             }
625             } elsif ($action->{mode} eq 'sync') {
626 0           my @outargs = eval { $action->{cb}->(@args) };
  0            
627 0 0         if ($@) {
628 0           $cb1->(RES_ERROR, $@);
629             } else {
630 0           $cb1->(RES_OK, @outargs);
631             }
632             } elsif ($action->{mode} eq 'subproc') {
633             my $cb2 = sub {
634             my $request = $self->{jsonobject}->encode({
635 0     0     jsonrpc => '2.0',
636             method => 'rpcswitch.result',
637             rpcswitch => $rpcswitch,
638             params => $_[0], # fixme: \@_?
639             });
640 0           $con->write($request);
641 0           };
642 0           eval {
643 0           $self->_subproc($cb2, $action, @args);
644             };
645 0 0         if ($@) {
646 0           $cb1->(RES_ERROR, $@);
647             } else {
648 0           $cb1->(RES_WAIT, $req_id);
649             }
650             } else {
651 0           die "unkown mode $action->{mode}";
652             }
653             }
654              
655              
656             sub _subproc {
657 0     0     my ($self, $cb, $action, $req_id, @args) = @_;
658              
659             # based on Mojo::IOLoop::Subprocess
660 0           my $ioloop = $self->ioloop;
661              
662             # Pipe for subprocess communication
663 0 0         pipe(my $reader, my $writer) or die "Can't create pipe: $!";
664              
665 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
666 0 0         unless ($pid) {# Child
667 0           $self->log->debug("in child $$");;
668 0           $ioloop->reset;
669 0           close $reader; # or we won't get a sigpipe when daddy dies..
670 0           my $undo = 0;
671 0           my @outargs = eval { $action->{cb}->($req_id, @args) };
  0            
672 0 0         if ($@) {
673 0           @outargs = ( RES_ERROR, $req_id, $@ );
674             } else {
675 0           unshift @outargs, (RES_OK, $req_id);
676             }
677 0           print $writer Storable::freeze(\@outargs);
678 0           $writer->flush;
679 0           close $writer;
680             # FIXME: normal exit?
681 0           POSIX::_exit(0);
682             }
683              
684             # Parent
685 0           my $me = $$;
686 0           close $writer;
687 0           my $stream = Mojo::IOLoop::Stream->new($reader)->timeout(0);
688 0           $ioloop->stream($stream);
689 0           my $buffer = '';
690 0     0     $stream->on(read => sub { $buffer .= pop });
  0            
691             $stream->on(
692             close => sub {
693             #say "close handler!";
694 0 0   0     return unless $$ == $me;
695 0           waitpid $pid, 0;
696 0           my $tmp = eval { Storable::thaw($buffer) };
  0            
697 0 0         if ($@) {
698 0           $tmp = [ RES_ERROR, $req_id, $@ ];
699             }
700 0           $self->log->debug('subprocess results: ' . Dumper($tmp));
701 0           eval {
702 0           $cb->($tmp)
703             }; # the connection might be gone?
704 0 0         $self->log->debug("got $@ writing subprocess results") if $@;
705             }
706 0           );
707             }
708              
709             sub close {
710 0     0 0   my ($self) = @_;
711 0           $self->log->debug('closing connection');
712 0           $self->conn->close();
713 0           $self->ns->close();
714 0           %$self = ();
715             }
716              
717             # tick while Mojo::Reactor is still running and condition callback is true
718             sub _loop {
719 0 0   0     warn __PACKAGE__." recursing into IO loop" if state $looping++;
720              
721 0           my $reactor = $_[0]->ioloop->reactor;
722 0           my $err;
723              
724 0 0         if (ref $reactor eq 'Mojo::Reactor::EV') {
    0          
725              
726 0           my $active = 1;
727              
728 0   0       $active = $reactor->one_tick while $_[1]->() && $active;
729              
730             } elsif (ref $reactor eq 'Mojo::Reactor::Poll') {
731              
732 0           $reactor->{running}++;
733              
734 0   0       $reactor->one_tick while $_[1]->() && $reactor->is_running;
735              
736 0   0       $reactor->{running} &&= $reactor->{running} - 1;
737              
738             } else {
739              
740 0           $err = "unknown reactor: ".ref $reactor;
741             }
742              
743 0           $looping--;
744 0 0         die $err if $err;
745             }
746              
747              
748             #sub DESTROY {
749             # my ($self) = @_;
750             # say STDERR "destroying $self";
751             #}
752              
753             1;
754              
755             =encoding utf8
756              
757             =head1 NAME
758              
759             RPC::Switch::Client - Connect to the RPC-Switch using Mojo.
760              
761             =head1 SYNOPSIS
762              
763             use RPC::Switch::Client;
764              
765             my $client = RPC::Switch::Client->new(
766             address => ...
767             port => ...
768             who => ...
769             token => ...
770             );
771              
772             my ($status, $outargs) = $client->call(
773             method => 'test',
774             inargs => { test => 'test' },
775             );
776              
777             =head1 DESCRIPTION
778              
779             L is a class to build a client to connect to the
780             L. The client can be used to initiate and inspect rpcs as well as
781             for providing 'worker' services to the RPC-Switch.
782              
783             =head1 METHODS
784              
785             =head2 new
786              
787             $client = RPC::Switch::Client->new(%arguments);
788              
789             Class method that returns a new RPC::Switch::Client object.
790              
791             Valid arguments are:
792              
793             =over 4
794              
795             =item - address: address of the RPC-Switch.
796              
797             (default: 127.0.0.1)
798              
799             =item - port: port of the RPC-Switch
800              
801             (default 6551)
802              
803             =item - tls: connect using tls
804              
805             (default false)
806              
807             =item - tls_ca: verify server using ca
808              
809             (default undef)
810              
811             =item - tls_key: private client key
812              
813             (default undef)
814              
815             =item - tls_ca: public client certificate
816              
817             (default undef)
818              
819             =item - who: who to authenticate as.
820              
821             (required)
822              
823             =item - method: how to authenticate.
824              
825             (default: password)
826              
827             =item - token: token to authenticate with.
828              
829             (required)
830              
831             =item - debug: when true prints debugging using L
832              
833             (default: false)
834              
835             =item - json: flag wether input is json or perl.
836              
837             when true expects the inargs to be valid json, when false a perl hashref is
838             expected and json encoded. (default true)
839              
840             =item - ioloop: L object to use
841              
842             (per default the L->singleton object is used)
843              
844             =item - log: L object to use
845              
846             (per default a new L object is created)
847              
848             =item - jsonobject: json encoder/decoder object to use
849              
850             (per default a new L object is created)
851              
852             =item - timeout: how long to wait for Api calls to complete
853              
854             (default 60 seconds)
855              
856             =item - ping_timeout: after this long without a ping from the Api the
857             connection will be closed and the work() method will return
858              
859             (default: 5 minutes)
860              
861             =item - autoconnect: automatically connect to the RPC-Switch.
862              
863             (default: true)
864              
865             =back
866              
867             =head2 connect
868              
869             $connected = $client->connect();
870              
871             Connect (or reconnect) to the RPC-Switch. Returns a true value if the
872             connection succeeded.
873              
874             =head2 is_connected
875              
876             $connected = $client->is_connected();
877              
878             Returns a true value if the $client is connected.
879              
880             =head2 call
881              
882             ($status, $outargs) = $client->call(%args);
883              
884             Calls the RPC-Switch and waits for the results.
885              
886             Valid arguments are:
887              
888             =over 4
889              
890             =item - method: name of the method to call (required)
891              
892             =item - inargs: input arguments for the workflow (if any)
893              
894             =item - timeout: wait this many seconds for the method to finish
895             (optional, defaults to 5 times the Api-call timeout, so default 5 minutes)
896              
897             =back
898              
899             =head2 call_nb
900              
901             $client->call_nb(%args);
902              
903             Calls a method on the RPC-Switch and calls the provided callbacks on completion
904             of the method call.
905              
906             =over 4
907              
908             =item - waitcb: (optional) coderef that will be called when the worker
909             signals that processing may take a while. The $wait_id can be used with the
910             get_status call, $status wil be the string 'RES_WAIT'.
911              
912             ( waitcb => sub { ($status, $wait_id) = @_; ... } )
913              
914             =item - resultcb: coderef that will be called on method completion. $status
915             will be a string value, one of 'RES_OK' or 'RES_ERROR'. $outargs will be
916             the method return values or a error message, respectively.
917              
918             ( resultcb => sub { ($status, $outargs) = @_; ... } )
919              
920             =back
921              
922             =head2 get_status
923              
924             ($status, $outargs) = $client->get_status($wait_id,);
925              
926             Retrieves the status for the given $wait_id. See call_nb for a description
927             of the return values.
928              
929             =head2 ping
930              
931             $status = $client->ping($timeout);
932              
933             Tries to ping the RPC-Switch. On success return true. On failure returns
934             the undefined value, after that the client object should be undefined.
935              
936             =head2 announce
937              
938             Announces the capability to perform a method to the RPC-Switch. The
939             provided callback will be called when there is a method to be performed.
940             Returns an error when there was a problem announcing the action.
941              
942             my $err = $client->announce(
943             workername => 'me',
944             method => 'do.something',
945             cb => sub { ... },
946             );
947             die "could not announce $method?: $err" if $err;
948              
949             See L for an example.
950              
951             Valid arguments are:
952              
953             =over 4
954              
955             =item - workername: name of the worker
956              
957             (optional, defaults to client->who, processname and processid)
958              
959             =item - method: name of the method
960              
961             (required)
962              
963             =item - cb: callback to be called for the method
964              
965             Default arguments are the request_id and the contents of the JSON-RPC 2.0
966             params field.
967              
968             (required)
969              
970             =item - mode: callback mode
971              
972             (optional, default 'sync')
973              
974             Possible values:
975              
976             =over 8
977              
978             =item - 'sync': simple blocking mode, just return the results from the
979             callback. Use only for callbacks taking less than (about) a second.
980              
981             =item - 'subproc': the simple blocking callback is started in a seperate
982             process. Useful for callbacks that take a long time.
983              
984             =item - 'async': the callback gets passed another callback as the last
985             argument that is to be called on completion of the task. For advanced use
986             cases where the worker is actually more like a proxy. The (initial)
987             callback is expected to return soonish to the event loop, after setting up
988             some Mojo-callbacks.
989              
990             =back
991              
992             =item - async: backwards compatible way for specifying mode 'async'
993              
994             (optional, default false)
995              
996             =item - meta: pass RPC-Switch meta information
997              
998             The RPC-Switch meta information is passed to the callback as an extra
999             argument after the JSON-RPC 2.0 params field.
1000              
1001             =item - undocb: undo on error
1002              
1003             A callback that gets called when the original callback
1004             returns an error object or throws an error.
1005              
1006             Called with the same arguments as the original callback.
1007              
1008             (optional, only valid for mode 'subproc')
1009              
1010             =item - filter: only process a subset of the method
1011              
1012             The filter expression allows a worker to specify that it can only do the
1013             method for a certain subset of arguments. For example, for a "mkdir"
1014             action the filter expression {'host' => 'example.com'} would mean that this
1015             worker can only do mkdir on host example.com. Filter expressions are limited
1016             to simple equality tests on one or more keys, and only those keys that are
1017             allowed in the action definition. Filtering can be allowed, be mandatory or
1018             be forbidden per action.
1019              
1020             =item - doc: documentation for the method
1021              
1022             The documentation provided to the RPC-Switch can be retrieved by calling the
1023             rpcswitch.get_method_details method. Documentation is 'free-form' but the
1024             suggested format is something like:
1025              
1026             'doc' => {
1027             'description' => 'adds step to counter and returns counter; step defaults to 1',
1028             'outputs' => 'counter',
1029             'inputs' => 'counter, step'
1030             }
1031              
1032             =back
1033              
1034             =head2 work
1035              
1036             $client->work();
1037              
1038             Starts the L. Returns a non-zero value when the IOLoop was
1039             stopped due to some error condition (like a lost connection or a ping
1040             timeout).
1041              
1042             =head3 Possible work() exit codes
1043              
1044             The RPC::Switch:Client library currently defines the following exit codes:
1045              
1046             WORK_OK
1047             WORK_PING_TIMEOUT
1048             WORK_CONNECTION_CLOSED
1049              
1050             =head2 stop
1051              
1052             $client->stop($exit);
1053              
1054             Makes the work() function exit with the provided exit code.
1055              
1056             =head1 REMOTE METHOD INFORMATION
1057              
1058             Once a connection has been established to the RPC-Switch there are two
1059             methods that can provide information about the remote methods that are
1060             callable via the RPC-Switch.
1061              
1062              
1063             =over 4
1064              
1065             =item - B
1066              
1067             Produces a list of all methods that are callable by the current role with a
1068             short description text if available
1069              
1070             Example:
1071             ./rpc-switch-client rpcswitch.get_methods '{}'
1072              
1073             ...
1074             [
1075             {
1076             'foo.add' => 'adds 2 numbers'
1077             },
1078             {
1079             'foo.div' => 'undocumented method'
1080             },
1081             ...
1082             ];
1083              
1084             =item - B
1085              
1086             Gives detailed information about a specific method. Details can include the
1087             'backend' (b) method that a worker needs to provide, a short descrption (d)
1088             and contact information (c). If a worker is available then the documentation
1089             for that method from that worker is shown.
1090              
1091             Example:
1092             ./rpc-switch-client rpcswitch.get_method_details '{"method":"foo.add"}'
1093              
1094             ...
1095             {
1096             'doc' => {
1097             'description' => 'adds step to counter and returns counter; step defaults to 1',
1098             'outputs' => 'counter',
1099             'inputs' => 'counter, step'
1100             },
1101             'b' => 'bar.add',
1102             'd' => 'adds 2 numbers',
1103             'c' => 'wieger'
1104             }
1105              
1106             =back
1107              
1108             =head1 SEE ALSO
1109              
1110             =over 4
1111              
1112             =item *
1113              
1114             L, L, L: the L Web framework
1115              
1116             =item *
1117              
1118             L, L
1119              
1120             =back
1121              
1122             L: RPC-Switch
1123              
1124             =head1 ACKNOWLEDGEMENT
1125              
1126             This software has been developed with support from L.
1127             In German: Diese Software wurde mit Unterstützung von L entwickelt.
1128              
1129             =head1 THANKS
1130              
1131             =over 4
1132              
1133             =item *
1134              
1135             'greencoloured' for multiple PRs
1136              
1137             =back
1138              
1139             =head1 AUTHORS
1140              
1141             =over 4
1142              
1143             =item *
1144              
1145             Wieger Opmeer
1146              
1147             =back
1148              
1149             =head1 COPYRIGHT AND LICENSE
1150              
1151             This software is copyright (c) 2018-2022 by Wieger Opmeer.
1152              
1153             This is free software; you can redistribute it and/or modify it under
1154             the same terms as the Perl 5 programming language system itself.
1155              
1156             =cut
1157              
1158             1;