File Coverage

blib/lib/RPC/Switch/Client.pm
Criterion Covered Total %
statement 61 460 13.2
branch 1 188 0.5
condition 0 80 0.0
subroutine 21 75 28.0
pod 10 15 66.6
total 93 818 11.3


line stmt bran cond sub pod time code
1             package RPC::Switch::Client;
2 1     1   819 use Mojo::Base 'Mojo::EventEmitter';
  1         2  
  1         8  
3              
4             our $VERSION = '0.20'; # VERSION
5              
6             #
7             # Mojo's default reactor uses EV, and EV does not play nice with signals
8             # without some handholding. We either can try to detect EV and do the
9             # handholding, or try to prevent Mojo using EV.
10             #
11             BEGIN {
12 1 50   1   1917 $ENV{'MOJO_REACTOR'} = 'Mojo::Reactor::Poll' unless $ENV{'MOJO_REACTOR'};
13             }
14              
15             # more Mojolicious
16 1     1   505 use Mojo::IOLoop;
  1         159334  
  1         17  
17 1     1   44 use Mojo::IOLoop::Stream;
  1         2  
  1         7  
18 1     1   530 use Mojo::Log;
  1         12430  
  1         11  
19              
20             # standard perl
21 1     1   44 use Carp qw(croak);
  1         2  
  1         51  
22 1     1   7 use Scalar::Util qw(blessed refaddr);
  1         2  
  1         61  
23 1     1   7 use Cwd qw(realpath);
  1         2  
  1         40  
24 1     1   5 use Data::Dumper;
  1         3  
  1         49  
25 1     1   5 use Encode qw(encode_utf8 decode_utf8);
  1         2  
  1         51  
26 1     1   8 use File::Basename;
  1         1  
  1         59  
27 1     1   6 use IO::Handle;
  1         1  
  1         46  
28 1     1   7 use POSIX ();
  1         2  
  1         19  
29 1     1   5 use Scalar::Util qw(blessed refaddr);
  1         1  
  1         55  
30 1     1   741 use Storable;
  1         3362  
  1         59  
31 1     1   509 use Sys::Hostname;
  1         1000  
  1         71  
32              
33             # from cpan
34 1     1   554 use JSON::RPC2::TwoWay 0.05; # for configurable json encoder
  1         3833  
  1         30  
35             # JSON::RPC2::TwoWay depends on JSON::MaybeXS anyways, so it can be used here
36             # without adding another dependency
37 1     1   7 use JSON::MaybeXS qw();
  1         2  
  1         22  
38 1     1   520 use MojoX::NetstringStream 0.06;
  1         1267  
  1         7  
39              
40             # us
41 1     1   539 use RPC::Switch::Client::Steps;
  1         3  
  1         8  
42              
43             has [qw(
44             actions address auth cb_used channels clientid conn debug ioloop
45             json lastping log method ns ping_timeout port rpc timeout tls token
46             who
47             )];
48              
49             # keep in sync with the rpc-switch
50             use constant {
51 1         6635 RES_OK => 'RES_OK',
52             RES_WAIT => 'RES_WAIT',
53             RES_TIMEOUT => 'RES_TIMEOUT',
54             RES_ERROR => 'RES_ERROR',
55             RES_OTHER => 'RES_OTHER', # 'dunno'
56             WORK_OK => 0, # exit codes for work method
57             WORK_PING_TIMEOUT => 92,
58             WORK_CONNECTION_CLOSED => 91,
59 1     1   75 };
  1         2  
60              
61             sub new {
62 0     0 1   my ($class, %args) = @_;
63 0           my $self = $class->SUPER::new();
64              
65 0   0       my $debug = $args{debug} // 0; # or 1?
66              
67 0   0       $self->{address} = $args{address} // '127.0.0.1';
68 0           $self->{cb_used} = {}; # avoid calling cb twice in a timeout scenario
69 0           $self->{channels} = {}; # per channel hash of waitids
70 0           $self->{debug} = $debug;
71 0   0       $self->{json} = $args{json} // 1;
72             $self->{jsonobject} = $args{jsonobject} // JSON::MaybeXS->new(),
73 0   0       $self->{ping_timeout} = $args{ping_timeout} // 300;
      0        
74 0   0       $self->{ioloop} = $args{ioloop} // Mojo::IOLoop->singleton;
75             $self->{log} = $args{log}
76 0 0 0       // Mojo::Log->new(level => ($debug) ? 'debug' : 'info');
77 0   0       $self->{method} = $args{method} // 'password';
78 0   0       $self->{port} = $args{port} // 6551;
79 0   0       $self->{timeout} = $args{timeout} // 60;
80 0   0       $self->{tls} = $args{tls} // 0;
81 0           $self->{tls_ca} = $args{tls_ca};
82 0           $self->{tls_cert} = $args{tls_cert};
83 0           $self->{tls_key} = $args{tls_key};
84 0 0         $self->{token} = $args{token} or croak 'no token?';
85 0 0         $self->{who} = $args{who} or croak 'no who?';
86 0   0       $self->{autoconnect} = $args{autoconnect} // 1;
87              
88 0 0         return $self unless $self->{autoconnect};
89              
90 0           $self->connect;
91              
92 0 0         return $self if $self->{auth};
93 0           return;
94             }
95              
96             sub connect {
97 0     0 1   my $self = shift;
98              
99 0           delete $self->ioloop->{__exit__};
100 0           delete $self->{auth};
101 0           $self->{actions} = {};
102              
103             $self->on(disconnect => sub {
104 0     0     my ($self, $code) = @_;
105             #$self->{_exit} = $code;
106 0           $self->ioloop->stop;
107 0           });
108              
109             my $rpc = JSON::RPC2::TwoWay->new(
110             debug => $self->{debug},
111             json => $self->{jsonobject},
112 0 0         ) or croak 'no rpc?';
113 0     0     $rpc->register('rpcswitch.greetings', sub { $self->rpc_greetings(@_) }, notification => 1);
  0            
114 0     0     $rpc->register('rpcswitch.ping', sub { $self->rpc_ping(@_) });
  0            
115 0     0     $rpc->register('rpcswitch.channel_gone', sub { $self->rpc_channel_gone(@_) }, notification => 1);
  0            
116             $rpc->register(
117             'rpcswitch.result',
118 0     0     sub { $self->rpc_result(@_) },
119 0           by_name => 0,
120             notification => 1,
121             raw => 1
122             );
123              
124             my $clarg = {
125             address => $self->{address},
126             port => $self->{port},
127             tls => $self->{tls},
128 0           };
129 0 0         $clarg->{tls_ca} = $self->{tls_ca} if $self->{tls_ca};
130 0 0         $clarg->{tls_cert} = $self->{tls_cert} if $self->{tls_cert};
131 0 0         $clarg->{tls_key} = $self->{tls_key} if $self->{tls_key};
132              
133             my $clientid = $self->ioloop->client(
134             $clarg => sub {
135 0     0     my ($loop, $err, $stream) = @_;
136 0 0         if ($err) {
137 0           $err =~ s/\n$//s;
138 0           $self->log->error('connection to API failed: ' . $err);
139 0           $self->{auth} = 0;
140 0           return;
141             }
142 0           my $ns = MojoX::NetstringStream->new(stream => $stream);
143 0           $self->{ns} = $ns;
144             my $conn = $rpc->newconnection(
145             owner => $self,
146 0           write => sub { $ns->write(@_) },
147 0           );
148 0           $self->{conn} = $conn;
149             $ns->on(chunk => sub {
150 0           my ($ns2, $chunk) = @_;
151             #say 'got chunk: ', $chunk;
152 0           my @err = $conn->handle($chunk);
153 0 0         $self->log->info('chunk handler: ' . join(' ', grep defined, @err)) if @err;
154 0 0         $ns->close if $err[0];
155 0           });
156             $ns->on(close => sub {
157             # this cb is called during global destruction, at
158             # least on old perls where
159             # Mojo::Util::_global_destruction() won't work
160 0 0         return unless $conn;
161 0           $conn->close;
162 0           $self->log->warn('connection to rpcswitch closed');
163 0           $self->emit(disconnect => WORK_CONNECTION_CLOSED); # todo: doc
164 0           });
165 0           });
166              
167 0           $self->{rpc} = $rpc;
168 0           $self->{clientid} = $clientid;
169              
170             # handle timeout?
171             my $tmr = $self->ioloop->timer($self->{timeout} => sub {
172 0     0     my $loop = shift;
173 0           $self->log->error('timeout wating for greeting');
174 0           $loop->remove($clientid); # disconnect
175 0           $self->{auth} = 0;
176 0           });
177              
178 0           $self->log->debug('starting handshake');
179            
180             # fixme: catch signals?
181 0     0     $self->_loop(sub { not defined $self->{auth} });
  0            
182              
183 0           $self->log->debug('done with handhake?');
184              
185 0           $self->ioloop->remove($tmr);
186 0           $self->unsubscribe('disconnect');
187              
188 0           return $self->{auth};
189             }
190              
191             sub is_connected {
192 0     0 1   my $self = shift;
193 0   0       return $self->{auth} && !$self->ioloop->{__exit__};
194             }
195              
196             sub rpc_greetings {
197 0     0 0   my ($self, $c, $i) = @_;
198             RPC::Switch::Client::Steps->new(ioloop => $self->ioloop)->steps([sub{
199 0     0     my $steps = shift;
200 0 0         die "wrong api version $i->{version} (expected 1.0)" unless $i->{version} eq '1.0';
201 0           $self->log->info('got greeting from ' . $i->{who});
202 0           $c->call(
203             'rpcswitch.hello',
204             {who => $self->who, method => $self->method, token => $self->token},
205             $steps->next,
206             );
207             }, sub {
208 0     0     my ($steps, $e, $r) = @_;
209 0           my $w;
210 0 0         die "hello returned error $e->{message} ($e->{code})" if $e;
211 0 0         die 'no results from hello?' unless $r;
212 0           ($r, $w) = @$r;
213 0 0         if ($r) {
214 0           $self->log->info("hello returned: $r, $w");
215 0           $self->{auth} = 1;
216             } else {
217 0   0       $self->log->error('hello failed: ' . ($w // ''));
218 0           $self->{auth} = 0; # defined but false
219             }
220             }],sub {
221 0     0     my ($err) = @_;
222 0           $self->log->error('something went wrong in handshake: ' . $err);
223 0           $self->{auth} = '';
224 0           });
225             }
226              
227             sub call {
228 0     0 1   my ($self, %args) = @_;
229 0           my ($done, $status, $outargs);
230             $args{waitcb} = sub {
231 0     0     ($status, $outargs) = @_;
232 0 0 0       unless ($status and $status eq RES_WAIT) {
233 0   0       $self->log->error('unexpected status: ' . ($status // 'undef'));
234 0           $done++;
235 0           return;
236             }
237 0           $self->log->debug("gotta wait for $outargs");
238 0           };
239             $args{resultcb} = sub {
240 0     0     ($status, $outargs) = @_;
241 0           $done++;
242 0           };
243 0           $self->call_nb(%args);
244              
245 0     0     $self->_loop(sub { !$done });
  0            
246              
247 0           return $status, $outargs;
248             }
249              
250             sub call_nb {
251 0     0 1   my ($self, %args) = @_;
252 0 0         my $method = $args{method} or die 'no method?';
253 0           my $vtag = $args{vtag};
254 0   0       my $inargs = $args{inargs} // '{}';
255 0           my $waitcb = $args{waitcb}; # optional
256 0   0       my $rescb = $args{resultcb} // die 'no result callback?';
257 0   0       my $timeout = $args{timeout} // 0; # accommodate existing code where this didn't work
258 0           my $reqauth = $args{reqauth};
259 0           my $inargsj;
260              
261 0 0         if ($self->{json}) {
262 0           $inargsj = $inargs;
263 0           $inargs = $self->{jsonobject}->decode($inargs);
264 0 0         croak 'inargs is not a json object' unless ref $inargs eq 'HASH';
265             } else {
266 0 0         croak 'inargs should be a hashref' unless ref $inargs eq 'HASH';
267             # test encoding
268 0           $inargsj = $self->{jsonobject}->encode($inargs);
269 0 0         if ($reqauth) {
270             }
271             }
272              
273 0 0         if ($reqauth) {
274 0 0         if (blessed($reqauth)) {
275 0 0         if ($reqauth->can('_to_reqauth')) {
276             # duck typing in action
277 0           $reqauth = $reqauth->_to_reqauth();
278             } else {
279 0           croak "Don't know how to convert $reqauth to reqauth hash";
280             }
281             }
282 0 0         croak 'reqauth should be a hashref' unless ref $reqauth eq 'HASH';
283             }
284              
285 0           my $req = {
286             rescb => $rescb,
287             };
288              
289 0 0         if ($timeout > 0) {
290             $req->{tmr} = $self->ioloop->timer($timeout => sub {
291 0 0   0     my $rescb = delete $req->{rescb} or return;
292 0           $rescb->(RES_TIMEOUT, "timed out after $timeout seconds");
293 0           });
294             }
295              
296 0           $inargsj = decode_utf8($inargsj);
297 0 0         $self->log->debug("calling $method with '" . $inargsj . "'" . (($vtag) ? " (vtag $vtag)" : ''));
298              
299             RPC::Switch::Client::Steps->new(ioloop => $self->ioloop)->steps([sub{
300 0     0     my $steps = shift;
301 0 0         $self->conn->callraw({
302             method => $method,
303             params => $inargs,
304             ($reqauth ? (rpcswitch => { vcookie => 'eatme', reqauth => $reqauth }) : ()),
305             }, $steps->next );
306             }, sub {
307             #$self->log->debug('got response:' . Dumper(\@_));
308 0     0     my ($steps, $e, $r) = @_;
309 0 0         if ($e) {
310 0           $e = $e->{error};
311 0           $self->log->error("call returned error: $e->{message} ($e->{code})");
312 0 0         $rescb->(RES_ERROR, "$e->{message} ($e->{code})") if $rescb;
313 0           return;
314             }
315 0           my ($rescb, $tmr) = @{$req}{qw(rescb tmr)};
  0            
316 0 0         return unless $rescb; # $rescb is undef if a timeout happeded
317 0           my ($status, $outargs) = @{$r->{result}};
  0            
318 0 0         if ($status eq RES_WAIT) {
319 0           my $vci = $r->{rpcswitch}->{vci};
320 0 0         unless ($vci) {
321 0           $self->log->error("missing rpcswitch vci after RES_WAIT");
322 0           return;
323             }
324              
325             # note the relation to the channel so we can throw an error if
326             # the channel disappears
327             # outargs should contain waitid
328             # autovivification ftw?
329 0           $self->{channels}->{$vci}->{$outargs} = $req;
330 0 0         $waitcb->($status, $outargs) if $waitcb;
331             } else {
332             $outargs = $self->{jsonobject}->encode($outargs)
333 0 0 0       if $self->{json} and ref $outargs;
334 0           $rescb->($status, $outargs);
335 0 0         $self->ioloop->remove($tmr) if $tmr;
336             }
337 0           return;
338             }], sub {
339 0     0     my ($err) = @_;
340 0           $self->log->error("Something went wrong in call_nb: $err");
341 0           my ($rescb, $tmr) = @{$req}{qw(rescb tmr)};
  0            
342 0 0         $rescb->(RES_ERROR, $err) if $rescb;
343 0 0         $self->ioloop->remove($tmr) if $tmr;
344 0           return @_;
345 0           });
346 0           return;
347             }
348              
349             sub get_status {
350 0     0 1   my ($self, $wait_id, $notify) = @_;
351            
352 0           my ($ns, $id) = split /:/, $wait_id, 2;
353            
354 0 0         die "no namespace in waitid?" unless $ns;
355            
356 0 0         my $inargs = {
357             wait_id => $wait_id,
358             notify => ($notify ? JSON->true : JSON->false),
359             };
360             # meh:
361 0 0         $inargs = $self->{jsonobject}->encode($inargs) if $self->{json};
362              
363             # fixme: reuse call?
364 0           my ($done, $status, $outargs);
365             my %args = (
366             method => "$ns._get_status",
367             inargs => $inargs,
368             waitcb => sub {
369 0     0     ($status, $outargs) = @_;
370 0 0 0       die "unexpected status" unless $status and $status eq RES_WAIT;
371 0 0         $done++ unless $notify;
372             },
373             resultcb => sub {
374 0     0     ($status, $outargs) = @_;
375 0           $done++;
376             },
377 0           );
378 0           $self->call_nb(%args);
379              
380 0     0     $self->_loop(sub { !$done });
  0            
381              
382 0           return $status, $outargs;
383             }
384              
385             sub rpc_result {
386 0     0 0   my ($self, $c, $r) = @_;
387             #$self->log->debug('got result: ' . Dumper($r));
388 0           my ($status, $id, $outargs) = @{$r->{params}};
  0            
389 0 0         return unless $id;
390 0           my $vci = $r->{rpcswitch}->{vci};
391 0 0         return unless $vci;
392 0           my $req = delete $self->{channels}->{$vci}->{$id};
393 0 0         return unless $req;
394 0           my ($rescb, $tmr) = @{$req}{qw(rescb tmr)};
  0            
395 0 0         return unless $rescb;
396 0 0         $self->ioloop->remove($tmr) if $tmr;
397             $outargs = $self->{jsonobject}->encode($outargs)
398 0 0 0       if $self->{json} and ref $outargs;
399 0           $rescb->($status, $outargs);
400 0           return;
401             }
402              
403             sub rpc_channel_gone {
404 0     0 0   my ($self, $c, $a) = @_;
405 0           my $ch = $a->{channel};
406 0           $self->log->debug("got channel_gone: $ch");
407 0 0         return unless $ch;
408 0           my $wl = delete $self->{channels}->{$ch};
409 0 0         return unless $wl;
410 0           for (values %$wl) {
411 0           my ($rescb, $tmr) = @{$_}{qw(rescb tmr)};
  0            
412 0 0         $self->ioloop->remove($tmr) if $tmr;
413 0 0         $rescb->(RES_ERROR, 'channel gone') if $rescb;
414             }
415 0           return;
416             }
417              
418             sub ping {
419 0     0 1   my ($self, $timeout) = @_;
420              
421 0   0       $timeout //= $self->timeout;
422 0           my ($done, $ret);
423              
424             $self->ioloop->timer($timeout => sub {
425 0     0     $done++;
426 0           });
427              
428             $self->conn->call('rpcswitch.ping', {}, sub {
429 0     0     my ($e, $r) = @_;
430 0 0 0       if (not $e and $r and $r =~ /pong/) {
      0        
431 0           $ret = 1;
432             } else {
433 0           %$self = ();
434             }
435 0           $done++;
436 0           });
437              
438             # we could recurse here
439 0     0     $self->_loop(sub { !$done });
  0            
440              
441 0           return $ret;
442             }
443              
444             sub work {
445 0     0 1   my ($self, $prepare) = @_;
446              
447 0           my $pt = $self->ping_timeout;
448 0           my $tmr;
449             $tmr = $self->ioloop->recurring($pt => sub {
450 0     0     my $ioloop = shift;
451 0   0       $self->log->debug('in ping_timeout timer: lastping: '
452             . ($self->lastping // 0) . ' limit: ' . (time - $pt) );
453 0 0 0       return if ($self->lastping // 0) > time - $pt;
454 0           $self->log->error('ping timeout');
455 0           $ioloop->remove($self->clientid);
456 0           $ioloop->remove($tmr);
457 0           $ioloop->{__exit__} = WORK_PING_TIMEOUT; # todo: doc
458 0           $ioloop->stop;
459 0 0         }) if $pt > 0;
460             $self->on(disconnect => sub {
461 0     0     my ($self, $code) = @_;
462 0           $self->ioloop->{__exit__} = $code;
463 0           $self->ioloop->stop;
464 0           });
465 0 0         return 0 if $prepare;
466 0           $self->ioloop->{__exit__} = WORK_OK;
467 0           $self->log->debug(blessed($self) . ' starting work');
468 0 0         $self->ioloop->start unless $self->ioloop->is_running;
469 0           $self->log->debug(blessed($self) . ' done?');
470 0 0         $self->ioloop->remove($tmr) if $tmr;
471              
472 0           return $self->ioloop->{__exit__};
473             }
474              
475             sub stop {
476 0     0 1   my ($self, $exit) = @_;
477 0           $self->ioloop->{__exit__} = $exit;
478 0           $self->ioloop->stop;
479             }
480              
481             sub announce {
482 0     0 1   my ($self, %args) = @_;
483 0 0         my $method = $args{method} or croak 'no method?';
484 0 0         my $cb = $args{cb} or croak 'no cb?';
485             #my $async = $args{async} // 0;
486 0 0 0       my $mode = $args{mode} // (($args{async}) ? 'async' : 'sync');
487 0 0         croak "unknown callback mode $mode" unless $mode =~ /^(subproc|async|async2|sync)$/;
488 0           my $host = hostname;
489 0   0       my $workername = $args{workername} // "$self->{who} $host $0 $$";
490            
491 0 0         croak "already have action $method" if $self->actions->{$method};
492            
493 0           my ($done, $err);
494             RPC::Switch::Client::Steps->new(ioloop => $self->ioloop)->steps([
495             sub {
496 0     0     my $steps = shift;
497             # fixme: check results?
498             $self->conn->call('rpcswitch.announce', {
499             workername => $workername,
500             method => $method,
501             (($args{filter}) ? (filter => $args{filter}) : ()),
502 0 0         (($args{doc}) ? (doc => $args{doc}) : ()),
    0          
503             }, $steps->next(),
504             );
505             },
506             sub {
507             #say 'call returned: ', Dumper(\@_);
508 0     0     my ($steps, $e, $r) = @_;
509 0           $done++; # reply received, stop wating
510 0 0         if ($e) {
511 0           $self->log->debug("announce got error " . Dumper($e));
512 0           $err = $e->{message};
513 0           return;
514             }
515 0           my ($res, $msg) = @$r;
516 0 0         unless ($res) {
517 0           $err = $msg;
518 0           $self->log->error("announce got res: $res msg: $msg");
519 0           return;
520             }
521 0           my $worker_id = $msg->{worker_id};
522             my $action = {
523             cb => $cb,
524             mode => $mode,
525             undocb => $args{undocb},
526             meta => $args{meta},
527 0           worker_id => $worker_id,
528             };
529 0           $self->actions->{$method} = $action;
530             $self->rpc->register(
531             $method,
532 0           sub { $self->_magic($action, @_) },
533 0           non_blocking => 1,
534             raw => 1,
535             );
536 0           $self->log->debug("succesfully announced $method");
537             }],sub {
538 0     0     ($err) = @_;
539 0           $done++;
540 0           $self->log->debug("something went wrong with announce: $err");
541 0           });
542              
543 0     0     $self->_loop(sub { !$done });
  0            
544              
545 0           return $err;
546             }
547              
548             sub rpc_ping {
549 0     0 0   my ($self, $c, $i, $rpccb) = @_;
550 0           $self->lastping(time());
551 0           return 'pong!';
552             }
553              
554             sub _magic {
555             #say '_magic: ', Dumper(\@_);
556 0     0     my ($self, $action, $con, $request, $rpccb) = @_;
557 0           my $method = $request->{method};
558 0           my $req_id = $request->{id};
559 0 0         unless ($action) {
560 0           $self->log->info("_magic for unknown action $method");
561 0           return;
562             }
563             my $rpcswitch = $request->{rpcswitch} or
564 0 0         die "no rpcswitch information?";
565 0           $rpcswitch->{worker_id} = $action->{worker_id};
566 0           my $resp = {
567             jsonrpc => '2.0',
568             id => $req_id,
569             rpcswitch => $rpcswitch,
570             };
571             my $cb1 = sub {
572 0     0     $resp->{result} = \@_;
573 0           $rpccb->($resp);
574 0           };
575 0           my @args = ($req_id, $request->{params});
576 0 0         push @args, $rpcswitch if $action->{meta};
577              
578 0           local $@;
579             # fastest to slowest?
580 0 0         if ($action->{mode} eq 'async2') {
    0          
    0          
    0          
581             my $cb2 = sub {
582             my $request = $self->{jsonobject}->encode({
583 0     0     jsonrpc => '2.0',
584             method => 'rpcswitch.result',
585             rpcswitch => $rpcswitch,
586             params => \@_,
587             });
588 0           $con->write($request);
589 0           };
590 0           eval {
591 0           $action->{cb}->(@args, $cb1, $cb2);
592             };
593 0 0         if ($@) {
594 0           $cb1->(RES_ERROR, $@);
595             }
596             } elsif ($action->{mode} eq 'async') {
597             my $cb2 = sub {
598             my $request = $self->{jsonobject}->encode({
599 0     0     jsonrpc => '2.0',
600             method => 'rpcswitch.result',
601             rpcswitch => $rpcswitch,
602             params => [ RES_OK, $req_id, @_ ],
603             });
604 0           $con->write($request);
605 0           };
606 0           eval {
607 0           $action->{cb}->(@args, $cb2);
608             };
609 0 0         if ($@) {
610 0           $cb1->(RES_ERROR, $@);
611             } else {
612 0           $cb1->(RES_WAIT, $req_id);
613             }
614             } elsif ($action->{mode} eq 'sync') {
615 0           my @outargs = eval { $action->{cb}->(@args) };
  0            
616 0 0         if ($@) {
617 0           $cb1->(RES_ERROR, $@);
618             } else {
619 0           $cb1->(RES_OK, @outargs);
620             }
621             } elsif ($action->{mode} eq 'subproc') {
622             my $cb2 = sub {
623             my $request = $self->{jsonobject}->encode({
624 0     0     jsonrpc => '2.0',
625             method => 'rpcswitch.result',
626             rpcswitch => $rpcswitch,
627             params => $_[0], # fixme: \@_?
628             });
629 0           $con->write($request);
630 0           };
631 0           eval {
632 0           $self->_subproc($cb2, $action, @args);
633             };
634 0 0         if ($@) {
635 0           $cb1->(RES_ERROR, $@);
636             } else {
637 0           $cb1->(RES_WAIT, $req_id);
638             }
639             } else {
640 0           die "unkown mode $action->{mode}";
641             }
642             }
643              
644              
645             sub _subproc {
646 0     0     my ($self, $cb, $action, $req_id, @args) = @_;
647              
648             # based on Mojo::IOLoop::Subprocess
649 0           my $ioloop = $self->ioloop;
650              
651             # Pipe for subprocess communication
652 0 0         pipe(my $reader, my $writer) or die "Can't create pipe: $!";
653              
654 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
655 0 0         unless ($pid) {# Child
656 0           $self->log->debug("in child $$");;
657 0           $ioloop->reset;
658 0           close $reader; # or we won't get a sigpipe when daddy dies..
659 0           my $undo = 0;
660 0           my @outargs = eval { $action->{cb}->($req_id, @args) };
  0            
661 0 0         if ($@) {
662 0           @outargs = ( RES_ERROR, $req_id, $@ );
663             } else {
664 0           unshift @outargs, (RES_OK, $req_id);
665             }
666 0           print $writer Storable::freeze(\@outargs);
667 0           $writer->flush;
668 0           close $writer;
669             # FIXME: normal exit?
670 0           POSIX::_exit(0);
671             }
672              
673             # Parent
674 0           my $me = $$;
675 0           close $writer;
676 0           my $stream = Mojo::IOLoop::Stream->new($reader)->timeout(0);
677 0           $ioloop->stream($stream);
678 0           my $buffer = '';
679 0     0     $stream->on(read => sub { $buffer .= pop });
  0            
680             $stream->on(
681             close => sub {
682             #say "close handler!";
683 0 0   0     return unless $$ == $me;
684 0           waitpid $pid, 0;
685 0           my $tmp = eval { Storable::thaw($buffer) };
  0            
686 0 0         if ($@) {
687 0           $tmp = [ RES_ERROR, $req_id, $@ ];
688             }
689 0           $self->log->debug('subprocess results: ' . Dumper($tmp));
690 0           eval {
691 0           $cb->($tmp)
692             }; # the connection might be gone?
693 0 0         $self->log->debug("got $@ writing subprocess results") if $@;
694             }
695 0           );
696             }
697              
698             sub close {
699 0     0 0   my ($self) = @_;
700 0           $self->log->debug('closing connection');
701 0           $self->conn->close();
702 0           $self->ns->close();
703 0           %$self = ();
704             }
705              
706             # tick while Mojo::Reactor is still running and condition callback is true
707             sub _loop {
708 0 0   0     warn __PACKAGE__." recursing into IO loop" if state $looping++;
709              
710 0           my $reactor = $_[0]->ioloop->reactor;
711 0           my $err;
712              
713 0 0         if (ref $reactor eq 'Mojo::Reactor::EV') {
    0          
714              
715 0           my $active = 1;
716              
717 0   0       $active = $reactor->one_tick while $_[1]->() && $active;
718              
719             } elsif (ref $reactor eq 'Mojo::Reactor::Poll') {
720              
721 0           $reactor->{running}++;
722              
723 0   0       $reactor->one_tick while $_[1]->() && $reactor->is_running;
724              
725 0   0       $reactor->{running} &&= $reactor->{running} - 1;
726              
727             } else {
728              
729 0           $err = "unknown reactor: ".ref $reactor;
730             }
731              
732 0           $looping--;
733 0 0         die $err if $err;
734             }
735              
736              
737             #sub DESTROY {
738             # my ($self) = @_;
739             # say STDERR "destroying $self";
740             #}
741              
742             1;
743              
744             =encoding utf8
745              
746             =head1 NAME
747              
748             RPC::Switch::Client - Connect to the RPC-Switch using Mojo.
749              
750             =head1 SYNOPSIS
751              
752             use RPC::Switch::Client;
753              
754             my $client = RPC::Switch::Client->new(
755             address => ...
756             port => ...
757             who => ...
758             token => ...
759             );
760              
761             my ($status, $outargs) = $client->call(
762             method => 'test',
763             inargs => { test => 'test' },
764             );
765              
766             =head1 DESCRIPTION
767              
768             L is a class to build a client to connect to the
769             L. The client can be used to initiate and inspect rpcs as well as
770             for providing 'worker' services to the RPC-Switch.
771              
772             =head1 METHODS
773              
774             =head2 new
775              
776             $client = RPC::Switch::Client->new(%arguments);
777              
778             Class method that returns a new RPC::Switch::Client object.
779              
780             Valid arguments are:
781              
782             =over 4
783              
784             =item - address: address of the RPC-Switch.
785              
786             (default: 127.0.0.1)
787              
788             =item - port: port of the RPC-Switch
789              
790             (default 6551)
791              
792             =item - tls: connect using tls
793              
794             (default false)
795              
796             =item - tls_ca: verify server using ca
797              
798             (default undef)
799              
800             =item - tls_key: private client key
801              
802             (default undef)
803              
804             =item - tls_ca: public client certificate
805              
806             (default undef)
807              
808             =item - who: who to authenticate as.
809              
810             (required)
811              
812             =item - method: how to authenticate.
813              
814             (default: password)
815              
816             =item - token: token to authenticate with.
817              
818             (required)
819              
820             =item - debug: when true prints debugging using L
821              
822             (default: false)
823              
824             =item - json: flag wether input is json or perl.
825              
826             when true expects the inargs to be valid json, when false a perl hashref is
827             expected and json encoded. (default true)
828              
829             =item - ioloop: L object to use
830              
831             (per default the L->singleton object is used)
832              
833             =item - log: L object to use
834              
835             (per default a new L object is created)
836              
837             =item - jsonobject: json encoder/decoder object to use
838              
839             (per default a new L object is created)
840              
841             =item - timeout: how long to wait for Api calls to complete
842              
843             (default 60 seconds)
844              
845             =item - ping_timeout: after this long without a ping from the Api the
846             connection will be closed and the work() method will return
847              
848             (default: 5 minutes)
849              
850             =item - autoconnect: automatically connect to the RPC-Switch.
851              
852             (default: true)
853              
854             =back
855              
856             =head2 connect
857              
858             $connected = $client->connect();
859              
860             Connect (or reconnect) to the RPC-Switch. Returns a true value if the
861             connection succeeded.
862              
863             =head2 is_connected
864              
865             $connected = $client->is_connected();
866              
867             Returns a true value if the $client is connected.
868              
869             =head2 call
870              
871             ($status, $outargs) = $client->call(%args);
872              
873             Calls the RPC-Switch and waits for the results.
874              
875             Valid arguments are:
876              
877             =over 4
878              
879             =item - method: name of the method to call (required)
880              
881             =item - inargs: input arguments for the workflow (if any)
882              
883             =item - timeout: wait this many seconds for the method to finish
884             (optional, defaults to 5 times the Api-call timeout, so default 5 minutes)
885              
886             =back
887              
888             =head2 call_nb
889              
890             $client->call_nb(%args);
891              
892             Calls a method on the RPC-Switch and calls the provided callbacks on completion
893             of the method call.
894              
895             =over 4
896              
897             =item - waitcb: (optional) coderef that will be called when the worker
898             signals that processing may take a while. The $wait_id can be used with the
899             get_status call, $status wil be the string 'RES_WAIT'.
900              
901             ( waitcb => sub { ($status, $wait_id) = @_; ... } )
902              
903             =item - resultcb: coderef that will be called on method completion. $status
904             will be a string value, one of 'RES_OK' or 'RES_ERROR'. $outargs will be
905             the method return values or a error message, respectively.
906              
907             ( resultcb => sub { ($status, $outargs) = @_; ... } )
908              
909             =back
910              
911             =head2 get_status
912              
913             ($status, $outargs) = $client->get_status($wait_id,);
914              
915             Retrieves the status for the given $wait_id. See call_nb for a description
916             of the return values.
917              
918             =head2 ping
919              
920             $status = $client->ping($timeout);
921              
922             Tries to ping the RPC-Switch. On success return true. On failure returns
923             the undefined value, after that the client object should be undefined.
924              
925             =head2 announce
926              
927             Announces the capability to perform a method to the RPC-Switch. The
928             provided callback will be called when there is a method to be performed.
929             Returns an error when there was a problem announcing the action.
930              
931             my $err = $client->announce(
932             workername => 'me',
933             method => 'do.something',
934             cb => sub { ... },
935             );
936             die "could not announce $method?: $err" if $err;
937              
938             See L for an example.
939              
940             Valid arguments are:
941              
942             =over 4
943              
944             =item - workername: name of the worker
945              
946             (optional, defaults to client->who, processname and processid)
947              
948             =item - method: name of the method
949              
950             (required)
951              
952             =item - cb: callback to be called for the method
953              
954             Default arguments are the request_id and the contents of the JSON-RPC 2.0
955             params field.
956              
957             (required)
958              
959             =item - mode: callback mode
960              
961             (optional, default 'sync')
962              
963             Possible values:
964              
965             =over 8
966              
967             =item - 'sync': simple blocking mode, just return the results from the
968             callback. Use only for callbacks taking less than (about) a second.
969              
970             =item - 'subproc': the simple blocking callback is started in a seperate
971             process. Useful for callbacks that take a long time.
972              
973             =item - 'async': the callback gets passed another callback as the last
974             argument that is to be called on completion of the task. For advanced use
975             cases where the worker is actually more like a proxy. The (initial)
976             callback is expected to return soonish to the event loop, after setting up
977             some Mojo-callbacks.
978              
979             =back
980              
981             =item - async: backwards compatible way for specifying mode 'async'
982              
983             (optional, default false)
984              
985             =item - meta: pass RPC-Switch meta information
986              
987             The RPC-Switch meta information is passed to the callback as an extra
988             argument after the JSON-RPC 2.0 params field.
989              
990             =item - undocb: undo on error
991              
992             A callback that gets called when the original callback
993             returns an error object or throws an error.
994              
995             Called with the same arguments as the original callback.
996              
997             (optional, only valid for mode 'subproc')
998              
999             =item - filter: only process a subset of the method
1000              
1001             The filter expression allows a worker to specify that it can only do the
1002             method for a certain subset of arguments. For example, for a "mkdir"
1003             action the filter expression {'host' => 'example.com'} would mean that this
1004             worker can only do mkdir on host example.com. Filter expressions are limited
1005             to simple equality tests on one or more keys, and only those keys that are
1006             allowed in the action definition. Filtering can be allowed, be mandatory or
1007             be forbidden per action.
1008              
1009             =item - doc: documentation for the method
1010              
1011             The documentation provided to the RPC-Switch can be retrieved by calling the
1012             rpcswitch.get_method_details method. Documentation is 'free-form' but the
1013             suggested format is something like:
1014              
1015             'doc' => {
1016             'description' => 'adds step to counter and returns counter; step defaults to 1',
1017             'outputs' => 'counter',
1018             'inputs' => 'counter, step'
1019             }
1020              
1021             =back
1022              
1023             =head2 work
1024              
1025             $client->work();
1026              
1027             Starts the L. Returns a non-zero value when the IOLoop was
1028             stopped due to some error condition (like a lost connection or a ping
1029             timeout).
1030              
1031             =head3 Possible work() exit codes
1032              
1033             The RPC::Switch:Client library currently defines the following exit codes:
1034              
1035             WORK_OK
1036             WORK_PING_TIMEOUT
1037             WORK_CONNECTION_CLOSED
1038              
1039             =head2 stop
1040              
1041             $client->stop($exit);
1042              
1043             Makes the work() function exit with the provided exit code.
1044              
1045             =head1 REMOTE METHOD INFORMATION
1046              
1047             Once a connection has been established to the RPC-Switch there are two
1048             methods that can provide information about the remote methods that are
1049             callable via the RPC-Switch.
1050              
1051              
1052             =over 4
1053              
1054             =item - B
1055              
1056             Produces a list of all methods that are callable by the current role with a
1057             short description text if available
1058              
1059             Example:
1060             ./rpc-switch-client rpcswitch.get_methods '{}'
1061              
1062             ...
1063             [
1064             {
1065             'foo.add' => 'adds 2 numbers'
1066             },
1067             {
1068             'foo.div' => 'undocumented method'
1069             },
1070             ...
1071             ];
1072              
1073             =item - B
1074              
1075             Gives detailed information about a specific method. Details can include the
1076             'backend' (b) method that a worker needs to provide, a short descrption (d)
1077             and contact information (c). If a worker is available then the documentation
1078             for that method from that worker is shown.
1079              
1080             Example:
1081             ./rpc-switch-client rpcswitch.get_method_details '{"method":"foo.add"}'
1082              
1083             ...
1084             {
1085             'doc' => {
1086             'description' => 'adds step to counter and returns counter; step defaults to 1',
1087             'outputs' => 'counter',
1088             'inputs' => 'counter, step'
1089             },
1090             'b' => 'bar.add',
1091             'd' => 'adds 2 numbers',
1092             'c' => 'wieger'
1093             }
1094              
1095             =back
1096              
1097             =head1 SEE ALSO
1098              
1099             =over 4
1100              
1101             =item *
1102              
1103             L, L, L: the L Web framework
1104              
1105             =item *
1106              
1107             L, L
1108              
1109             =back
1110              
1111             L: RPC-Switch
1112              
1113             =head1 ACKNOWLEDGEMENT
1114              
1115             This software has been developed with support from L.
1116             In German: Diese Software wurde mit Unterstützung von L entwickelt.
1117              
1118             =head1 AUTHORS
1119              
1120             =over 4
1121              
1122             =item *
1123              
1124             Wieger Opmeer
1125              
1126             =back
1127              
1128             =head1 COPYRIGHT AND LICENSE
1129              
1130             This software is copyright (c) 2018 by Wieger Opmeer.
1131              
1132             This is free software; you can redistribute it and/or modify it under
1133             the same terms as the Perl 5 programming language system itself.
1134              
1135             =cut
1136              
1137             1;