File Coverage

blib/lib/JobCenter/Client/Mojo.pm
Criterion Covered Total %
statement 55 515 10.6
branch 1 208 0.4
condition 0 97 0.0
subroutine 19 89 21.3
pod 13 19 68.4
total 88 928 9.4


line stmt bran cond sub pod time code
1             package JobCenter::Client::Mojo;
2 1     1   852 use Mojo::Base 'Mojo::EventEmitter';
  1         3  
  1         8  
3              
4             our $VERSION = '0.44'; # VERSION
5              
6             #
7             # Mojo's default reactor uses EV, and EV does not play nice with signals
8             # without some handholding. We either can try to detect EV and do the
9             # handholding, or try to prevent Mojo using EV.
10             #
11             BEGIN {
12 1 50   1   1931 $ENV{'MOJO_REACTOR'} = 'Mojo::Reactor::Poll' unless $ENV{'MOJO_REACTOR'};
13             }
14              
15             # more Mojolicious
16 1     1   524 use Mojo::IOLoop;
  1         161177  
  1         16  
17 1     1   43 use Mojo::IOLoop::Stream;
  1         2  
  1         6  
18 1     1   527 use Mojo::Log;
  1         12503  
  1         10  
19              
20             # standard perl
21 1     1   41 use Carp qw(croak);
  1         3  
  1         48  
22 1     1   6 use Cwd qw(realpath);
  1         3  
  1         51  
23 1     1   7 use Data::Dumper;
  1         3  
  1         51  
24 1     1   7 use Encode qw(encode_utf8 decode_utf8);
  1         2  
  1         43  
25 1     1   6 use File::Basename;
  1         2  
  1         56  
26 1     1   5 use IO::Handle;
  1         2  
  1         47  
27 1     1   6 use POSIX ();
  1         2  
  1         15  
28 1     1   790 use Storable;
  1         3357  
  1         59  
29 1     1   555 use Sys::Hostname;
  1         1018  
  1         73  
30              
31             # from cpan
32 1     1   509 use JSON::RPC2::TwoWay 0.05;
  1         3871  
  1         31  
33             # JSON::RPC2::TwoWay depends on JSON::MaybeXS anyways, so it can be used here
34             # without adding another dependency
35 1     1   6 use JSON::MaybeXS qw();
  1         3  
  1         21  
36 1     1   537 use MojoX::NetstringStream 0.06; # for the enhanced close
  1         1237  
  1         8  
37              
38             # us
39 1     1   602 use JobCenter::Client::Mojo::Steps;
  1         3  
  1         8  
40              
41             has [qw(
42             actions address auth clientid conn debug ioloop jobs json
43             lastping log method ns ping_timeout port rpc timeout tls token who
44             )];
45              
46             # keep in sync with the jobcenter
47             use constant {
48 1         8219 WORK_OK => 0, # exit codes for work method
49             WORK_PING_TIMEOUT => 92,
50             WORK_CONNECTION_CLOSED => 91,
51 1     1   92 };
  1         1  
52              
53             sub new {
54 0     0 1   my ($class, %args) = @_;
55 0           my $self = $class->SUPER::new();
56              
57 0   0       my $address = $args{address} // '127.0.0.1';
58 0   0       my $debug = $args{debug} // 0; # or 1?
59 0   0       $self->{ioloop} = $args{ioloop} // Mojo::IOLoop->singleton;
60 0   0       my $json = $args{json} // 1;
61 0 0 0       my $log = $args{log} // Mojo::Log->new(level => ($debug) ? 'debug' : 'info');
62 0   0       my $method = $args{method} // 'password';
63 0   0       my $port = $args{port} // 6522;
64 0   0       my $timeout = $args{timeout} // 60;
65 0   0       my $tls = $args{tls} // 0;
66 0           my $tls_ca = $args{tls_ca};
67 0           my $tls_cert = $args{tls_cert};
68 0           my $tls_key = $args{tls_key};
69 0 0         my $token = $args{token} or croak 'no token?';
70 0 0         my $who = $args{who} or croak 'no who?';
71              
72 0           $self->{address} = $address;
73 0   0       $self->{debug} = $args{debug} // 1;
74 0           $self->{jobs} = {};
75 0           $self->{json} = $json;
76             $self->{jsonobject} = $args{jsonobject} // JSON::MaybeXS->new(utf8 => 1),
77 0   0       $self->{ping_timeout} = $args{ping_timeout} // 300;
      0        
78 0           $self->{log} = $log;
79 0           $self->{method} = $method;
80 0           $self->{port} = $port;
81 0           $self->{timeout} = $timeout;
82 0           $self->{tls} = $tls;
83 0           $self->{tls_ca} = $tls_ca;
84 0           $self->{tls_cert} = $tls_cert;
85 0           $self->{tls_key} = $tls_key;
86 0           $self->{token} = $token;
87 0           $self->{who} = $who;
88 0   0       $self->{autoconnect} = ($args{autoconnect} //= 1);
89              
90 0 0         return $self if !$args{autoconnect};
91              
92 0           $self->connect;
93              
94 0 0         return $self if $self->{auth};
95 0           return;
96             }
97              
98             sub connect {
99 0     0 1   my $self = shift;
100              
101 0           delete $self->ioloop->{__exit__};
102 0           delete $self->{auth};
103 0           $self->{actions} = {};
104              
105             $self->on(disconnect => sub {
106 0     0     my ($self, $code) = @_;
107             #$self->{_exit} = $code;
108 0           $self->ioloop->stop;
109 0           });
110              
111             my $rpc = JSON::RPC2::TwoWay->new(
112             debug => $self->{debug},
113             json => $self->{jsonobject},
114 0 0         ) or croak 'no rpc?';
115 0     0     $rpc->register('greetings', sub { $self->rpc_greetings(@_) }, notification => 1);
  0            
116 0     0     $rpc->register('job_done', sub { $self->rpc_job_done(@_) }, notification => 1);
  0            
117 0     0     $rpc->register('ping', sub { $self->rpc_ping(@_) });
  0            
118 0     0     $rpc->register('task_ready', sub { $self->rpc_task_ready(@_) }, notification => 1);
  0            
119              
120             my $clarg = {
121             address => $self->{address},
122             port => $self->{port},
123             tls => $self->{tls},
124 0           };
125 0 0         $clarg->{tls_ca} = $self->{tls_ca} if $self->{tls_ca};
126 0 0         $clarg->{tls_cert} = $self->{tls_cert} if $self->{tls_cert};
127 0 0         $clarg->{tls_key} = $self->{tls_key} if $self->{tls_key};
128              
129             my $clientid = $self->ioloop->client(
130             $clarg => sub {
131 0     0     my ($loop, $err, $stream) = @_;
132 0 0         if ($err) {
133 0           $err =~ s/\n$//s;
134 0           $self->log->info('connection to API failed: ' . $err);
135 0           $self->{auth} = 0;
136 0           return;
137             }
138 0           my $ns = MojoX::NetstringStream->new(stream => $stream);
139 0           $self->{ns} = $ns;
140             my $conn = $rpc->newconnection(
141             owner => $self,
142 0           write => sub { $ns->write(@_) },
143 0           );
144 0           $self->{conn} = $conn;
145             $ns->on(chunk => sub {
146 0           my ($ns2, $chunk) = @_;
147             #say 'got chunk: ', $chunk;
148 0           my @err = $conn->handle($chunk);
149 0 0         $self->log->debug('chunk handler: ' . join(' ', grep defined, @err)) if @err;
150 0 0         $ns->close if $err[0];
151 0           });
152             $ns->on(close => sub {
153             # this cb is called during global destruction, at
154             # least on old perls where
155             # Mojo::Util::_global_destruction() won't work
156 0 0         return unless $conn;
157 0           $conn->close;
158 0           $self->log->info('connection to API closed');
159 0           $self->emit(disconnect => WORK_CONNECTION_CLOSED); # todo doc
160 0           });
161 0           });
162              
163 0           $self->{rpc} = $rpc;
164 0           $self->{clientid} = $clientid;
165              
166             # handle timeout?
167             my $tmr = $self->ioloop->timer($self->{timeout} => sub {
168 0     0     my $loop = shift;
169 0           $self->log->error('timeout wating for greeting');
170 0           $loop->remove($clientid);
171 0           $self->{auth} = 0;
172 0           });
173              
174 0           $self->log->debug('starting handshake');
175              
176 0     0     $self->_loop(sub { not defined $self->{auth} });
  0            
177              
178 0           $self->log->debug('done with handhake?');
179              
180 0           $self->ioloop->remove($tmr);
181 0           $self->unsubscribe('disconnect');
182 0           1;
183             }
184              
185             sub is_connected {
186 0     0 0   my $self = shift;
187 0   0       return $self->{auth} && !$self->ioloop->{__exit__};
188             }
189              
190             sub rpc_greetings {
191 0     0 0   my ($self, $c, $i) = @_;
192             #$self->ioloop->delay(
193             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
194             sub {
195 0     0     my $steps = shift;
196 0 0         die "wrong api version $i->{version} (expected 1.1)" unless $i->{version} eq '1.1';
197 0           $self->log->info('got greeting from ' . $i->{who});
198 0           $c->call('hello', {who => $self->who, method => $self->method, token => $self->token}, $steps->next());
199             },
200             sub {
201 0     0     my ($steps, $e, $r) = @_;
202 0           my $w;
203             #say 'hello returned: ', Dumper(\@_);
204 0 0         die "hello returned error $e->{message} ($e->{code})" if $e;
205 0 0         die 'no results from hello?' unless $r;
206 0           ($r, $w) = @$r;
207 0 0         if ($r) {
208 0           $self->log->info("hello returned: $r, $w");
209 0           $self->{auth} = 1;
210             } else {
211 0   0       $self->log->error('hello failed: ' . ($w // ''));
212 0           $self->{auth} = 0; # defined but false
213             }
214             }], sub {
215 0     0     my ($err) = @_;
216 0           $self->log->error('something went wrong in handshake: ' . $err);
217 0           $self->{auth} = '';
218 0           });
219             }
220              
221             sub rpc_job_done {
222 0     0 0   my ($self, $conn, $i) = @_;
223 0           my $job_id = $i->{job_id};
224 0           my $outargs = $i->{outargs};
225 0           my $outargsj = $self->{jsonobject}->encode($outargs);
226 0 0         $outargs = $outargsj if $self->{json};
227 0           $outargsj = decode_utf8($outargsj); # for debug printing
228 0           my $rescb = delete $self->{jobs}->{$job_id};
229 0 0         if ($rescb) {
230 0           $self->log->debug("got job_done: for job_id $job_id result: $outargsj");
231 0           local $@;
232 0           eval {
233 0           $rescb->($job_id, $outargs);
234             };
235 0 0         $self->log->info("got $@ calling result callback") if $@;
236             } else {
237 0           $self->log->debug("got job_done for unknown job $job_id result: $outargsj");
238             }
239             }
240              
241             sub call {
242 0     0 1   my ($self, %args) = @_;
243 0           my ($done, $job_id, $outargs);
244             $args{cb1} = sub {
245 0     0     ($job_id, $outargs) = @_;
246 0 0         $done++ unless $job_id;
247 0           };
248             $args{cb2} = sub {
249 0     0     ($job_id, $outargs) = @_;
250 0           $done++;
251 0           };
252 0           $self->call_nb(%args);
253              
254 0     0     $self->_loop(sub { !$done });
  0            
255              
256 0           return $job_id, $outargs;
257             }
258              
259             sub call_nb {
260 0     0 1   my ($self, %args) = @_;
261 0 0         my $wfname = $args{wfname} or die 'no workflowname?';
262 0           my $vtag = $args{vtag};
263 0   0       my $inargs = $args{inargs} // '{}';
264 0   0       my $callcb = $args{cb1} // die 'no call callback?';
265 0   0       my $rescb = $args{cb2} // die 'no result callback?';
266 0   0       my $timeout = $args{timeout} // $self->timeout * 5; # a bit hackish..
267 0           my $reqauth = $args{reqauth};
268 0           my $clenv = $args{clenv};
269 0           my $inargsj;
270              
271 0 0         if ($self->{json}) {
272 0           $inargsj = $inargs;
273 0           $inargs = $self->{jsonobject}->decode($inargs);
274 0 0         croak 'inargs is not a json object' unless ref $inargs eq 'HASH';
275 0 0         if ($clenv) {
276 0           $clenv = decode_json($clenv);
277 0 0         croak 'clenv is not a json object' unless ref $clenv eq 'HASH';
278             }
279 0 0         if ($reqauth) {
280 0           $reqauth = decode_json($reqauth);
281 0 0         croak 'reqauth is not a json object' unless ref $reqauth eq 'HASH';
282             }
283             } else {
284 0 0         croak 'inargs should be a hashref' unless ref $inargs eq 'HASH';
285             # test encoding
286 0           $inargsj = $self->{jsonobject}->encode($inargs);
287 0 0         if ($clenv) {
288 0 0         croak 'clenv should be a hashref' unless ref $clenv eq 'HASH';
289             }
290 0 0         if ($reqauth) {
291 0 0         croak 'reqauth should be a hashref' unless ref $reqauth eq 'HASH';
292             }
293             }
294              
295 0           $inargsj = decode_utf8($inargsj);
296 0 0         $self->log->debug("calling $wfname with '" . $inargsj . "'" . (($vtag) ? " (vtag $vtag)" : ''));
297              
298             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
299             sub {
300 0     0     my $steps = shift;
301 0 0         $self->conn->call('create_job', {
    0          
302             wfname => $wfname,
303             vtag => $vtag,
304             inargs => $inargs,
305             timeout => $timeout,
306             ($clenv ? (clenv => $clenv) : ()),
307             ($reqauth ? (reqauth => $reqauth) : ()),
308             }, $steps->next());
309             },
310             sub {
311 0     0     my ($steps, $e, $r) = @_;
312 0           my ($job_id, $msg);
313 0 0         if ($e) {
314 0           $self->log->error("create_job returned error: $e->{message} ($e->{code}");
315 0           $msg = "$e->{message} ($e->{code}"
316             } else {
317 0           ($job_id, $msg) = @$r; # fixme: check for arrayref?
318 0 0         if ($msg) {
    0          
319 0           $self->log->error("create_job returned error: $msg");
320             } elsif ($job_id) {
321 0           $self->log->debug("create_job returned job_id: $job_id");
322 0           $self->jobs->{$job_id} = $rescb;
323             }
324             }
325 0 0         if ($msg) {
326 0 0         $msg = {error => $msg} unless ref $msg;
327 0 0         $msg = $self->{jsonobject}->encode($msg) if $self->{json};
328             }
329 0           $callcb->($job_id, $msg);
330             }], sub {
331 0     0     my ($err) = @_;
332 0           $self->log->error("Something went wrong in call_nb: $err");
333 0           $err = { error => $err };
334 0 0         $err = $self->{jsonobject}->encode($err) if $self->{json};
335 0           $callcb->(undef, $err);
336 0           });
337             }
338              
339             sub close {
340 0     0 1   my ($self) = @_;
341              
342 0           $self->log->debug('closing connection');
343 0           $self->conn->close();
344 0           $self->ns->close();
345 0           %$self = ();
346             }
347              
348             sub find_jobs {
349 0     0 1   my ($self, $filter) = @_;
350 0 0         croak('no filter?') unless $filter;
351              
352             #print 'filter: ', Dumper($filter);
353 0 0         $filter = $self->{jsonobject}->encode($filter) if ref $filter eq 'HASH';
354              
355 0           my ($done, $err, $jobs);
356             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
357             sub {
358 0     0     my $steps = shift;
359             # fixme: check results?
360 0           $self->conn->call('find_jobs', { filter => $filter }, $steps->next());
361             },
362             sub {
363             #say 'find_jobs call returned: ', Dumper(\@_);
364 0     0     my ($steps, $e, $r) = @_;
365 0           $done++; # received something so done waiting
366 0 0         if ($e) {
367 0           $self->log->error("find_jobs got error $e->{message} ($e->{code})");
368 0           $err = $e->{message};
369 0           return;
370             }
371 0           $jobs = $r;
372             }], sub {
373 0     0     my ($err) = @_;
374 0           $done++;
375 0           $self->log->error("something went wrong with get_job_status: $err");
376 0           });
377              
378 0     0     $self->_loop(sub { !$done });
  0            
379              
380 0 0         return $err, @$jobs if ref $jobs eq 'ARRAY';
381 0           return $err;
382             }
383              
384             sub get_api_status {
385 0     0 0   my ($self, $what) = @_;
386 0 0         croak('no what?') unless $what;
387              
388 0           my ($done, $result);
389             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
390             sub {
391 0     0     my $steps = shift;
392 0           $self->conn->call('get_api_status', { what => $what }, $steps->next());
393             },
394             sub {
395             #say 'call returned: ', Dumper(\@_);
396 0     0     my ($d, $e, $r) = @_;
397 0           $done++; # received something so done waiting
398 0 0         if ($e) {
399 0           $self->log->error("get_api_status got error $e->{message} ($e->{code})");
400 0           $result = $e->{message};
401 0           return;
402             }
403 0           $result = $r;
404             }], sub {
405 0     0     my ($err) = @_;
406 0           $done++;
407 0           $self->log->eror("something went wrong with get_api_status: $err");
408 0           });
409              
410 0     0     $self->_loop(sub { !$done });
  0            
411              
412 0           return $result;
413             }
414              
415             sub get_job_status {
416 0     0 1   my ($self, $job_id) = @_;
417 0 0         croak('no job_id?') unless $job_id;
418              
419 0           my ($done, $job_id2, $outargs);
420             $self->get_job_status_nb(
421             job_id => $job_id,
422             statuscb => sub {
423 0     0     ($job_id2, $outargs) = @_;
424 0           $done++;
425 0           return;
426             },
427 0           );
428 0     0     $self->_loop(sub { !$done });
  0            
429 0           return $job_id2, $outargs;
430             }
431              
432             sub get_job_status_nb {
433 0     0 1   my ($self, %args) = @_;
434             my $job_id = $args{job_id} or
435 0 0         croak('no job_id?');
436              
437 0           my $statuscb = $args{statuscb};
438 0 0         croak('statuscb should be a coderef')
439             if ref $statuscb ne 'CODE';
440              
441 0           my $notifycb = $args{notifycb};
442 0 0 0       croak('notifycb should be a coderef')
443             if $notifycb and ref $notifycb ne 'CODE';
444              
445             #my ($done, $job_id2, $outargs);
446             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
447             sub {
448 0     0     my $steps = shift;
449             # fixme: check results?
450 0 0         $self->conn->call(
451             'get_job_status', {
452             job_id => $job_id,
453             notify => ($notifycb ? JSON->true : JSON->false),
454             }, $steps->next()
455             );
456             },
457             sub {
458             #say 'call returned: ', Dumper(\@_);
459 0     0     my ($steps, $e, $r) = @_;
460             #$self->log->debug("get_job_satus_nb got job_id: $res msg: $msg");
461 0 0         if ($e) {
462 0           $self->log->error("get_job_status got error $e->{message} ($e->{code})");
463 0           $statuscb->(undef, $e->{message});
464 0           return;
465             }
466 0           my ($job_id2, $outargs) = @$r;
467 0 0 0       if ($notifycb and !$job_id2 and !$outargs) {
      0        
468 0           $self->jobs->{$job_id} = $notifycb;
469             }
470 0 0 0       $outargs = $self->{jsonobject}->encode($outargs) if $self->{json} and ref $outargs;
471 0           $statuscb->($job_id2, $outargs);
472 0           return;
473             }], sub {
474 0     0     my ($err) = @_;
475 0           $self->log->error("Something went wrong in get_job_status_nb: $err");
476 0           $err = { error => $err };
477 0 0         $err = $self->{jsonobject}->encode($err) if $self->{json};
478 0           $statuscb->(undef, $err);
479 0           });
480             }
481              
482             sub ping {
483 0     0 1   my ($self, $timeout) = @_;
484              
485 0   0       $timeout //= $self->timeout;
486 0           my ($done, $ret);
487              
488             $self->ioloop->timer($timeout => sub {
489 0     0     $done++;
490 0           });
491              
492             $self->conn->call('ping', {}, sub {
493 0     0     my ($e, $r) = @_;
494 0 0 0       if (not $e and $r and $r =~ /pong/) {
      0        
495 0           $ret = 1;
496             } else {
497 0           %$self = ();
498             }
499 0           $done++;
500 0           });
501              
502 0     0     $self->_loop(sub { !$done });
  0            
503              
504 0           return $ret;
505             }
506              
507             sub work {
508 0     0 1   my ($self, $prepare) = @_;
509              
510 0           my $pt = $self->ping_timeout;
511 0           my $tmr;
512             $tmr = $self->ioloop->recurring($pt => sub {
513 0     0     my $ioloop = shift;
514 0   0       $self->log->debug('in ping_timeout timer: lastping: '
515             . ($self->lastping // 0) . ' limit: ' . (time - $pt) );
516 0 0 0       return if ($self->lastping // 0) > time - $pt;
517 0           $self->log->error('ping timeout');
518 0           $ioloop->remove($self->clientid);
519 0           $ioloop->remove($tmr);
520 0           $ioloop->{__exit__} = WORK_PING_TIMEOUT; # todo: doc
521 0           $ioloop->stop;
522 0 0         }) if $pt > 0;
523             $self->on(disconnect => sub {
524 0     0     my ($self, $code) = @_;
525 0           $self->ioloop->{__exit__} = $code;
526 0           $self->ioloop->stop;
527 0           });
528 0 0         return 0 if $prepare;
529              
530 0           $self->ioloop->{__exit__} = WORK_OK;
531 0           $self->log->debug('JobCenter::Client::Mojo starting work');
532 0 0         $self->ioloop->start unless Mojo::IOLoop->is_running;
533 0           $self->log->debug('JobCenter::Client::Mojo done?');
534 0 0         $self->ioloop->remove($tmr) if $tmr;
535              
536 0           return $self->ioloop->{__exit__};
537             }
538              
539             sub stop {
540 0     0 1   my ($self, $exit) = @_;
541 0           $self->ioloop->{__exit__} = $exit;
542 0           $self->ioloop->stop;
543             }
544              
545             sub create_slotgroup {
546 0     0 1   my ($self, $name, $slots) = @_;
547 0 0         croak('no slotgroup name?') unless $name;
548              
549 0           my ($done, $result);
550             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
551             sub {
552 0     0     my $steps = shift;
553 0           $self->conn->call('create_slotgroup', { name => $name, slots => $slots }, $steps->next());
554             },
555             sub {
556             #say 'call returned: ', Dumper(\@_);
557 0     0     my ($steps, $e, $r) = @_;
558 0           $done++; # received something so done waiting
559 0 0         if ($e) {
560 0           $self->log->error("create_slotgroup got error $e->{message}");
561 0           $result = $e->{message};
562 0           return;
563             }
564 0           $result = $r;
565             }], sub {
566 0     0     my ($err) = @_;
567 0           $done++;
568 0           $self->log->eror("something went wrong with create_slotgroup: $err");
569 0           });
570              
571 0     0     $self->_loop(sub { !$done });
  0            
572              
573 0           return $result;
574             }
575              
576             sub announce {
577 0     0 1   my ($self, %args) = @_;
578 0 0         my $actionname = $args{actionname} or croak 'no actionname?';
579 0 0         my $cb = $args{cb} or croak 'no cb?';
580             #my $async = $args{async} // 0;
581 0 0 0       my $mode = $args{mode} // (($args{async}) ? 'async' : 'sync');
582 0 0         croak "unknown callback mode $mode" unless $mode =~ /^(subproc|async|sync)$/;
583 0           my $undocb = $args{undocb};
584 0           my $host = hostname;
585 0   0       my $workername = $args{workername} // "$self->{who} $host $0 $$";
586              
587 0 0         croak "already have action $actionname" if $self->actions->{$actionname};
588              
589 0           my ($done, $err);
590             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
591             sub {
592 0     0     my $steps = shift;
593             $self->conn->call('announce', {
594             workername => $workername,
595             actionname => $actionname,
596             slotgroup => $args{slotgroup},
597             slots => $args{slots},
598 0 0         (($args{filter}) ? (filter => $args{filter}) : ()),
599             }, $steps->next());
600             },
601             sub {
602             #say 'call returned: ', Dumper(\@_);
603 0     0     my ($steps, $e, $r) = @_;
604 0           $done++; # received something so done waiting
605 0 0         if ($e) {
606 0           $self->log->error("announce got error: $e->{message}");
607 0           $err = $e->{message};
608 0           return;
609             }
610 0           my ($res, $msg) = @$r;
611 0           $self->log->debug("announce got res: $res msg: $msg");
612             $self->actions->{$actionname} = {
613             cb => $cb,
614             mode => $mode,
615             undocb => $undocb,
616 0 0 0       addenv => $args{addenv} // 0,
617             } if $res;
618 0 0         $err = $msg unless $res;
619             }], sub {
620 0     0     ($err) = @_;
621 0           $done++;
622 0           $self->log->error("something went wrong with announce: $err");
623 0           });
624              
625 0     0     $self->_loop(sub { !$done });
  0            
626 0           return $err;
627             }
628              
629             sub rpc_ping {
630 0     0 0   my ($self, $c, $i, $rpccb) = @_;
631 0           $self->lastping(time());
632 0           return 'pong!';
633             }
634              
635             sub rpc_task_ready {
636             #say 'got task_ready: ', Dumper(\@_);
637 0     0 0   my ($self, $c, $i) = @_;
638 0           my $actionname = $i->{actionname};
639 0           my $job_id = $i->{job_id};
640 0           my $action = $self->actions->{$actionname};
641 0 0         unless ($action) {
642 0           $self->log->info("got task_ready for unknown action $actionname");
643 0           return;
644             }
645              
646 0           $self->log->debug("got task_ready for $actionname job_id $job_id calling get_task");
647             JobCenter::Client::Mojo::Steps->new(ioloop => $self->ioloop)->steps([
648             sub {
649 0     0     my $steps = shift;
650 0           $c->call('get_task', {actionname => $actionname, job_id => $job_id}, $steps->next());
651             },
652             sub {
653 0     0     my ($steps, $e, $r) = @_;
654             #say 'get_task returned: ', Dumper(\@_);
655 0 0         if ($e) {
656 0           $$self->log->debug("got $e->{message} ($e->{code}) calling get_task");
657             }
658 0 0         unless ($r) {
659 0           $self->log->debug('no task for get_task');
660 0           return;
661             }
662 0           my ($cookie, @args);
663 0           ($job_id, $cookie, @args) = @$r;
664 0 0         unless ($cookie) {
665 0           $self->log->debug('aaah? no cookie? (get_task)');
666 0           return;
667             }
668 0 0         pop @args unless $action->{addenv}; # remove env
669 0           local $@;
670 0 0         if ($action->{mode} eq 'subproc') {
    0          
    0          
671 0           eval {
672 0           $self->_subproc($c, $action, $job_id, $cookie, @args);
673             };
674 0 0         $c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@;
675             } elsif ($action->{mode} eq 'async') {
676 0           eval {
677             $action->{cb}->($job_id, @args, sub {
678 0           $c->notify('task_done', { cookie => $cookie, outargs => $_[0] });
679 0           });
680             };
681 0 0         $c->notify('task_done', { cookie => $cookie, outargs => { error => $@ } }) if $@;
682             } elsif ($action->{mode} eq 'sync') {
683 0           my $outargs = eval { $action->{cb}->($job_id, @args) };
  0            
684 0 0         $outargs = { error => $@ } if $@;
685 0           $c->notify('task_done', { cookie => $cookie, outargs => $outargs });
686             } else {
687 0           die "unkown mode $action->{mode}";
688             }
689             }], sub {
690 0     0     my ($err) = @_;
691 0           $self->log->error("something went wrong with rpc_task_ready: $err");
692 0           });
693             }
694              
695             sub _subproc {
696 0     0     my ($self, $c, $action, $job_id, $cookie, @args) = @_;
697              
698             # based on Mojo::IOLoop::Subprocess
699 0           my $ioloop = $self->ioloop;
700              
701             # Pipe for subprocess communication
702 0 0         pipe(my $reader, my $writer) or die "Can't create pipe: $!";
703              
704 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
705 0 0         unless ($pid) {# Child
706 0           $self->log->debug("in child $$");;
707 0           $ioloop->reset;
708 0           CORE::close $reader; # or we won't get a sigpipe when daddy dies..
709 0           my $undo = 0;
710 0           my $outargs = eval { $action->{cb}->($job_id, @args) };
  0            
711 0 0 0       if ($@) {
    0          
712 0           $outargs = {'error' => $@};
713 0           $undo++;
714             } elsif (ref $outargs eq 'HASH' and $outargs->{'error'}) {
715 0           $undo++;
716             }
717 0 0 0       if ($undo and $action->{undocb}) {
718 0           $self->log->info("undoing for $job_id");;
719 0           my $res = eval { $action->{undocb}->($job_id, @args); };
  0            
720 0 0         $res = $@ if $@;
721             # how should this look?
722             $outargs = {'error' => {
723             'msg' => 'undo failure',
724             'undo' => $res,
725             'olderr' => $outargs->{error},
726 0           }};
727 0           $undo = 0;
728             }
729             # stop ignoring sigpipe
730 0     0     $SIG{PIPE} = sub { $undo++ };
  0            
731             # if the parent is gone we get a sigpipe here:
732 0           print $writer Storable::freeze($outargs);
733 0 0         $writer->flush or $undo++;
734 0 0         CORE::close $writer or $undo++;
735 0 0 0       if ($undo and $action->{undocb}) {
736 0           $self->log->info("undoing for $job_id");;
737 0           eval { $action->{undocb}->($job_id, @args); };
  0            
738             # ignore errors because we can't report them back..
739             }
740             # FIXME: normal exit?
741 0           POSIX::_exit(0);
742             }
743              
744             # Parent
745 0           my $me = $$;
746 0           CORE::close $writer;
747 0           my $stream = Mojo::IOLoop::Stream->new($reader)->timeout(0);
748 0           $ioloop->stream($stream);
749 0           my $buffer = '';
750 0     0     $stream->on(read => sub { $buffer .= pop });
  0            
751             $stream->on(
752             close => sub {
753             #say "close handler!";
754 0 0   0     return unless $$ == $me;
755 0           waitpid $pid, 0;
756 0           my $outargs = eval { Storable::thaw($buffer) };
  0            
757 0 0         $outargs = { error => $@ } if $@;
758 0 0 0       if ($outargs and ref $outargs eq 'HASH') {
759 0           $self->log->debug('subprocess results: ' . Dumper($outargs));
760 0           eval { $c->notify(
  0            
761             'task_done',
762             { cookie => $cookie, outargs => $outargs }
763             ); }; # the connection might be gone?
764             } # else?
765             }
766 0           );
767             }
768              
769             # tick while Mojo::Reactor is still running and condition callback is true
770             sub _loop {
771 0 0   0     warn __PACKAGE__." recursing into IO loop" if state $looping++;
772              
773 0           my $reactor = $_[0]->ioloop->singleton->reactor;
774 0           my $err;
775              
776 0 0         if (ref $reactor eq 'Mojo::Reactor::EV') {
    0          
777              
778 0           my $active = 1;
779              
780 0   0       $active = $reactor->one_tick while $_[1]->() && $active;
781              
782             } elsif (ref $reactor eq 'Mojo::Reactor::Poll') {
783              
784 0           $reactor->{running}++;
785              
786 0   0       $reactor->one_tick while $_[1]->() && $reactor->is_running;
787              
788 0   0       $reactor->{running} &&= $reactor->{running} - 1;
789              
790             } else {
791              
792 0           $err = "unknown reactor: ".ref $reactor;
793             }
794              
795 0           $looping--;
796 0 0         die $err if $err;
797             }
798              
799             #sub DESTROY {
800             # my $self = shift;
801             # say 'destroying ', $self;
802             #}
803              
804             1;
805              
806             =encoding utf8
807              
808             =head1 NAME
809              
810             JobCenter::Client::Mojo - JobCenter JSON-RPC 2.0 Api client using Mojo.
811              
812             =head1 SYNOPSIS
813              
814             use JobCenter::Client::Mojo;
815              
816             my $client = JobCenter::Client::Mojo->new(
817             address => ...
818             port => ...
819             who => ...
820             token => ...
821             );
822              
823             my ($job_id, $outargs) = $client->call(
824             wfname => 'test',
825             inargs => { test => 'test' },
826             );
827              
828             =head1 DESCRIPTION
829              
830             L is a class to build a client to connect to the
831             JSON-RPC 2.0 Api of the L workflow engine. The client can be
832             used to create and inspect jobs as well as for providing 'worker' services
833             to the JobCenter.
834              
835             =head1 METHODS
836              
837             =head2 new
838              
839             $client = JobCenter::Client::Mojo->new(%arguments);
840              
841             Class method that returns a new JobCenter::Client::Mojo object.
842              
843             Valid arguments are:
844              
845             =over 4
846              
847             =item - address: address of the Api.
848              
849             (default: 127.0.0.1)
850              
851             =item - port: port of the Api
852              
853             (default 6522)
854              
855             =item - tls: connect using tls
856              
857             (default false)
858              
859             =item - tls_ca: verify server using ca
860              
861             (default undef)
862              
863             =item - tls_key: private client key
864              
865             (default undef)
866              
867             =item - tls_ca: public client certificate
868              
869             (default undef)
870              
871             =item - who: who to authenticate as.
872              
873             (required)
874              
875             =item - method: how to authenticate.
876              
877             (default: password)
878              
879             =item - token: token to authenticate with.
880              
881             (required)
882              
883             =item - debug: when true prints debugging using L
884              
885             (default: false)
886              
887             =item - ioloop: L object to use
888              
889             (per default the L->singleton object is used)
890              
891             =item - json: flag wether input is json or perl.
892              
893             when true expects the inargs to be valid json, when false a perl hashref is
894             expected and json encoded. (default true)
895              
896             =item - jsonobject: json encoder/decoder object to use
897              
898             (per default a new L object is created)
899              
900             =item - log: L object to use
901              
902             (per default a new L object is created)
903              
904             =item - timeout: how long to wait for Api calls to complete
905              
906             (default 60 seconds)
907              
908             =item - ping_timeout: after this long without a ping from the Api the
909             connection will be closed and the work() method will return
910              
911             (default 5 minutes)
912              
913             =back
914              
915             =head2 call
916              
917             ($job_id, $result) = $client->call(%args);
918              
919             Creates a new L job and waits for the results. Throws an error
920             if somethings goes wrong immediately. Errors encountered during later
921             processing are returned as a L error object.
922              
923             Valid arguments are:
924              
925             =over 4
926              
927             =item - wfname: name of the workflow to call (required)
928              
929             =item - inargs: input arguments for the workflow (if any)
930              
931             =item - vtag: version tag of the workflow to use (optional)
932              
933             =item - timeout: wait this many seconds for the job to finish
934             (optional, defaults to 5 times the Api-call timeout, so default 5 minutes)
935              
936             =item - reqauth: authentication token to be passed on to the authentication
937             module of the API for per job/request authentication.
938              
939             =item - clenv: client environment, made available as part of the job
940             environment and inherited to child jobs.
941              
942             =back
943              
944             =head2 call_nb
945              
946             $job_id = $client->call_nb(%args);
947              
948             Creates a new L job and call the provided callback on completion
949             of the job. Throws an error if somethings goes wrong immediately. Errors
950             encountered during later processing are returned as a L error
951             object to the callback.
952              
953             Valid arguments are those for L and:
954              
955             =over 4
956              
957             =item - cb1: coderef to the callback to call on job creation (requird)
958              
959             ( cb1 => sub { ($job_id, $err) = @_; ... } )
960              
961             If job_id is undefined the job was not created, the error is then returned
962             as the second return value.
963              
964             =item - cb2: coderef to the callback to call on job completion (requird)
965              
966             ( cb2 => sub { ($job_id, $outargs) = @_; ... } )
967              
968             =back
969              
970             =head2 get_job_status
971              
972             ($job_id, $result) = $client->get_job_status($job_id);
973              
974             Retrieves the status for the given $job_id. If the job_id does not exist
975             then the returned $job_id will be undefined and $result will be an error
976             message. If the job has not finished executing then both $job_id and
977             $result will be undefined. Otherwise the $result will contain the result of
978             the job. (Which may be a JobCenter error object)
979              
980             =head2 get_job_status_nb
981              
982             $client->get_job_status_nb(%args);
983              
984             Retrieves the status for the given $job_id.
985              
986             Valid arguments are:
987              
988             =over 4
989              
990             =item - job_id
991              
992             =item - statuscb: coderef to the callback for the current status
993              
994             ( statuscb => sub { ($job_id, $result) = @_; ... } )
995              
996             If the job_id does not exist then the returned $job_id will be undefined
997             and $result will be an error message. If the job has not finished executing
998             then both $job_id and $result will be undefined. Otherwise the $result will
999             contain the result of the job. (Which may be a JobCenter error object)
1000              
1001             =item - notifycb: coderef to the callback for job completion
1002              
1003             ( statuscb => sub { ($job_id, $result) = @_; ... } )
1004              
1005             If the job was still running when the get_job_status_nb call was made then
1006             this callback will be called on completion of the job.
1007              
1008             =back
1009              
1010             =head2 find_jobs
1011              
1012             ($err, @jobs) = $client->find_jobs({'foo'=>'bar'});
1013              
1014             Finds all currently running jobs with arguments matching the filter
1015             expression. The expression is evaluated in PostgreSQL using the @> for
1016             jsonb objects, basically this means that you can only do equality tests for
1017             one or more top-level keys. If @jobs is empty $err might contain an error
1018             message.
1019              
1020             =head2 ping
1021              
1022             $status = $client->ping($timeout);
1023              
1024             Tries to ping the JobCenter API. On success return true. On failure returns
1025             the undefined value, after that the client object should be undefined.
1026              
1027             =head2 close
1028              
1029             $client->close()
1030              
1031             Closes the connection to the JobCenter API and tries to de-allocate
1032             everything. Trying to use the client afterwards will produce errors.
1033              
1034             =head2 create_slotgroup
1035              
1036             $client->create_slotgroup($name, $slots)
1037              
1038             A 'slotgroup' is a way of telling the JobCenter API how many taskss the
1039             worker can do at once. The number of slots should be a positive integer.
1040             Slotgroups names starting with a _ (underscore) are reserved for internal
1041             use. Returns a error message when then was an error, the empty string
1042             otherwise.
1043              
1044             =head2 announce
1045              
1046             Announces the capability to do an action to the Api. The provided callback
1047             will be called when there is a task to be performed. Returns an error when
1048             there was a problem announcing the action.
1049              
1050             my $err = $client->announce(
1051             workername => 'me',
1052             actionname => 'do',
1053             slots => 1
1054             cb => sub { ... },
1055             );
1056             die "could not announce $actionname?: $err" if $err;
1057              
1058             See L for an example.
1059              
1060             Valid arguments are:
1061              
1062             =over 4
1063              
1064             =item - workername: name of the worker
1065              
1066             (optional, defaults to client->who, processname and processid)
1067              
1068             =item - actionname: name of the action
1069              
1070             (required)
1071              
1072             =item - cb: callback to be called for the action
1073              
1074             (required)
1075              
1076             =item - mode: callback mode
1077              
1078             (optional, default 'sync')
1079              
1080             Possible values:
1081              
1082             =over 8
1083              
1084             =item - 'sync': simple blocking mode, just return the results from the
1085             callback. Use only for callbacks taking less than (about) a second.
1086              
1087             =item - 'subproc': the simple blocking callback is started in a seperate
1088             process. Useful for callbacks that take a long time.
1089              
1090             =item - 'async': the callback gets passed another callback as the last
1091             argument that is to be called on completion of the task. For advanced use
1092             cases where the worker is actually more like a proxy. The (initial)
1093             callback is expected to return soonish to the event loop, after setting up
1094             some Mojo-callbacks.
1095              
1096             =back
1097              
1098             =item - async: backwards compatible way for specifying mode 'async'
1099              
1100             (optional, default false)
1101              
1102             =item - slotgroup: the slotgroup to use for accounting of parrallel tasks
1103              
1104             (optional, conflicts with 'slots')
1105              
1106             =item - slots: the amount of tasks the worker is able to process in parallel
1107             for this action.
1108              
1109             (optional, default 1, conflicts with 'slotgroup')
1110              
1111             =item - undocb: a callback that gets called when the original callback
1112             returns an error object or throws an error.
1113              
1114             Called with the same arguments as the original callback.
1115              
1116             (optional, only valid for mode 'subproc')
1117              
1118             =item - filter: only process a subset of the action
1119              
1120             The filter expression allows a worker to specify that it can only do the
1121             actionname for a certain subset of arguments. For example, for a "mkdir"
1122             action the filter expression {'host' => 'example.com'} would mean that this
1123             worker can only do mkdir on host example.com. Filter expressions are limited
1124             to simple equality tests on one or more keys, and only those keys that are
1125             allowed in the action definition. Filtering can be allowed, be mandatory or
1126             be forbidden per action.
1127              
1128             =item - addenv: pass on action enviroment to the callback
1129              
1130             If the addenv flag is true the action callback will be given one extra
1131             argument containing the action environment as a hashref. In the async
1132             callback mode the environment will be inserted before the result callback.
1133              
1134             =back
1135              
1136             =head2 work
1137              
1138             Starts the L. Returns a non-zero value when the IOLoop was
1139             stopped due to some error condition (like a lost connection or a ping
1140             timeout).
1141              
1142             =head3 Possible work() exit codes
1143              
1144             The JobCenter::Client::Mojo library currently defines the following exit codes:
1145              
1146             WORK_OK
1147             WORK_PING_TIMEOUT
1148             WORK_CONNECTION_CLOSED
1149              
1150             =head2 stop
1151              
1152             $client->stop($exit);
1153              
1154             Makes the work() function exit with the provided exit code.
1155              
1156             =head1 SEE ALSO
1157              
1158             =over 4
1159              
1160             =item *
1161              
1162             L, L, L: the L Web framework
1163              
1164             =item *
1165              
1166             L, L
1167              
1168             =back
1169              
1170             L: JobCenter Orchestration Engine
1171              
1172             =head1 ACKNOWLEDGEMENT
1173              
1174             This software has been developed with support from L.
1175             In German: Diese Software wurde mit Unterstützung von L entwickelt.
1176              
1177             =head1 THANKS
1178              
1179             Thanks to Eitan Schuler for reporting a bug and providing a pull request.
1180              
1181             =head1 AUTHORS
1182              
1183             =over 4
1184              
1185             =item *
1186              
1187             Wieger Opmeer
1188              
1189             =back
1190              
1191             =head1 COPYRIGHT AND LICENSE
1192              
1193             This software is copyright (c) 2017 by Wieger Opmeer.
1194              
1195             This is free software; you can redistribute it and/or modify it under
1196             the same terms as the Perl 5 programming language system itself.
1197              
1198             =cut
1199              
1200             1;