File Coverage

blib/lib/Mojo/Run.pm
Criterion Covered Total %
statement 53 285 18.6
branch 0 124 0.0
condition 0 72 0.0
subroutine 18 40 45.0
pod 1 11 9.0
total 72 532 13.5


line stmt bran cond sub pod time code
1             package Mojo::Run;
2              
3 1     1   25265 use Mojo::Base -base;
  1         13116  
  1         9  
4              
5 1     1   1095 use bytes;
  1         9  
  1         5  
6 1     1   27 use Carp;
  1         7  
  1         60  
7 1     1   793 use Errno;
  1         1155  
  1         53  
8 1     1   1100 use Socket;
  1         3731  
  1         477  
9 1     1   358344 use Time::HiRes qw(time gettimeofday);
  1         2330  
  1         7  
10 1     1   209 use Scalar::Util qw(blessed);
  1         1  
  1         160  
11 1     1   1089 use Storable qw(thaw nfreeze);
  1         4405  
  1         76  
12 1     1   1198 use POSIX ":sys_wait_h";
  1         8409  
  1         5  
13 1     1   2662 use Mojo::Log;
  1         171920  
  1         18  
14 1     1   1231 use Mojo::IOLoop;
  1         1089956  
  1         7  
15 1     1   49 use Data::Dumper;
  1         2  
  1         60  
16 1     1   6 use Mojo::Reactor;
  1         2  
  1         8  
17             has 'num_forks' => sub { 0 };
18             has 'max_forks' => sub { 0 };
19             has 'log' => sub { Mojo::Log ->new };
20             has 'ioloop' => sub { Mojo::IOLoop->new };
21             has [qw/reactor error is_child/];
22              
23             our $VERSION = '0.3';
24              
25             my $_obj = undef;
26              
27             BEGIN {
28 0     0   0 *portable_pipe = sub () { my ($r, $w);
29 0 0       0 pipe $r, $w or return;
30            
31 0         0 ($r, $w);
32 1     1   354 };
33             *portable_socketpair = sub () {
34 0 0   0   0 socketpair my $fh1, my $fh2, Socket::AF_UNIX(), Socket::SOCK_STREAM(), PF_UNSPEC
35             or return;
36 0         0 $fh1->autoflush(1);
37 0         0 $fh2->autoflush(1);
38            
39 0         0 ($fh1, $fh2)
40 1         2858 };
41             }
42              
43 0     0 1   sub new { __PACKAGE__->singleton }
44              
45             sub singleton {
46 0 0   0 0   return $_obj if defined $_obj;
47 0           return $_obj = __PACKAGE__->_constructor;
48             }
49              
50             sub _constructor {
51 0     0     my $proto = shift;
52 0   0       my $class = ref($proto) || $proto;
53 0           my $self = $class->SUPER::new;
54              
55 0           bless $self => $class;
56            
57             # install SIGCHLD handler
58 0     0     $SIG{'CHLD'} = sub { _sig_chld($self, @_) };
  0            
59            
60 0           return $self;
61             }
62              
63             sub log_level {
64 0     0 0   my ($self, $level) = @_;
65            
66 0 0         $self->log->level($level) if defined $level;
67            
68 0           return $self->log->level;
69             }
70              
71             sub spawn {
72 0     0 0   my ($self, %opt) = @_;
73            
74 0 0 0       unless (defined $self && blessed($self) && $self->isa(__PACKAGE__)) {
      0        
75 0           my $obj = __PACKAGE__->new;
76 0           return $obj->spawn(%opt);
77             }
78            
79 0           $self->error('');
80            
81 0 0 0       if ($self->max_forks > 0 && $self->num_forks >= $self->max_forks) {
82 0           $self->error("Unable to spawn another subprocess: "
83             ."Limit of " . $self->max_forks . " concurrently spawned process(es) is reached."
84             );
85 0           return 0;
86             }
87            
88             # normalize and validate run parameters...
89 0           my $proc = $self->_getRunStruct(\%opt);
90 0 0         return 0 unless $self->_validateRunStruct($proc);
91            
92 0 0         $self->log->debug("Spawning command "
93             ."timeout: "
94             .($proc->{exec_timeout} > 0 ? sprintf("%-.3f seconds]", $proc->{exec_timeout}) : "none")
95             ." : [$proc->{cmd}]"
96             );
97 0           my ($stdout_p, $stdout_c) = portable_socketpair;
98 0           my ($stderr_p, $stderr_c) = portable_socketpair;
99 0           my ($stdres_p, $stdres_c) = portable_socketpair;
100            
101 0           $proc->{time_started} = time;
102 0           $proc->{running } = 1;
103 0           $proc->{hdr_stdout } = $stdout_c;
104 0           $proc->{hdr_stderr } = $stderr_c;
105 0           $proc->{hdr_stdres } = $stdres_c;
106            
107 0           my $pid = fork;
108            
109 0 0         if ($pid) {
110             # parent
111 0           $self->num_forks($self->num_forks + 1);
112            
113 0           $self->log->debug("Subprocess spawned as pid $pid.");
114            
115 0           $proc->{pid} = $pid;
116            
117             # exec timeout
118 0 0 0       if (defined $proc->{exec_timeout} && $proc->{exec_timeout} > 0) {
119 0           $self->log->debug(
120             "[process $pid]: Setting execution timeout to " .
121             sprintf("%-.3f seconds.", $proc->{exec_timeout})
122             );
123             my $timer = $self->ioloop->timer(
124             $proc->{exec_timeout},
125 0     0     sub { _timeout_cb($self, $pid) }
126 0           );
127            
128             # save timer
129 0           $proc->{id_timeout} = $timer;
130             }
131            
132 0           $self->{_data}->{$pid} = $proc;
133            
134 0           close $stdout_p;
135 0           close $stderr_p;
136 0           close $stdres_p;
137            
138 0           $self->watch('stdout', $pid);
139 0           $self->watch('stderr', $pid);
140 0           $self->watch('stdres', $pid);
141             } else {
142             # child
143            
144 0           $self->is_child(1);
145            
146 0           close $stdout_c;
147 0           close $stderr_c;
148 0           close $stdres_c;
149            
150             # Stdio should not be tied.
151 0 0         if (tied *STDOUT) {
152 0           carp "Cannot redirect into tied STDOUT. Untying it";
153 0           untie *STDOUT;
154             }
155 0 0         if (tied *STDERR) {
156 0           carp "Cannot redirect into tied STDERR. Untying it";
157 0           untie *STDERR;
158             }
159            
160             # Redirect STDOUT
161 0 0         open STDOUT, ">&" . fileno($stdout_p)
162             or croak "can't redirect stdout in child pid $$: $!";
163             # Redirect STDERR
164 0 0         open STDERR, ">&" . fileno($stderr_p)
165             or croak "can't redirect stderr in child pid $$: $!";
166              
167 0           select STDERR; $| = 1;
  0            
168 0           select STDOUT; $| = 1;
  0            
169            
170 0 0         if (ref $proc->{cmd} eq 'CODE') {
171 0           my @rv = eval { $proc->{cmd}->($$, $proc->{param}); };
  0            
172            
173 0 0         if ($@) {
174 0           carp "exec of coderef failed: $@\n";
175 0           exit 255;
176             }
177            
178 0           print $stdres_p nfreeze(\ @rv);
179            
180             } else {
181 0 0         exec(ref $proc->{cmd} eq 'ARRAY' ? @{ $proc->{cmd} } : $proc->{cmd}) or do {
  0 0          
182 0           carp "exec failed";
183 0           exit 255;
184             };
185             }
186            
187 0           close $stdout_p;
188 0           close $stderr_p;
189 0           close $stdres_p;
190            
191 0           exit 1;
192             }
193            
194 0           return $pid;
195             }
196              
197 0     0 0   sub start { shift->ioloop->start }
198              
199             sub watch {
200 0     0 0   my $self = shift;
201 0   0       my $io = lc(shift || '');
202 0           my $pid = shift;
203            
204 0           my $proc = $self->get_proc($pid);
205            
206 0 0 0       $self->log->error('Cant start IO watcher off NULL process' ) and return unless $proc;
207 0 0 0       $self->log->error("[process $proc->{pid}]: IO ($io) is unsupported" ) and return unless $io ~~ [qw/stdout stderr stdres/];
208 0 0 0       $self->log->error("[process $proc->{pid}]: IO handler ($io) is EMPTY") and return unless $proc->{"hdr_$io"};
209            
210 0           my $id = fileno $proc->{"hdr_$io"};
211            
212             $self->ioloop->reactor->io($proc->{"hdr_$io"}, sub {
213 0     0     my $chunk = undef;
214 0           my $len = sysread $proc->{"hdr_$io"}, $chunk, 65536;
215            
216 0 0 0       return unless defined $len or $! != Errno::EINTR;
217            
218 0 0         if (!$len) {
219 0           $self->drop_handle($pid, $io);
220 0           return;
221             }
222            
223 0 0         if (defined $proc->{"$io\_cb"}) {
224 0           $self->log->debug("[process $proc->{pid}]: (handle: $id) Invoking ".uc($io)." callback.");
225            
226 0           eval { $proc->{"$io\_cb"}->($proc->{pid}, $chunk) };
  0            
227            
228 0 0         if ($@) {
229 0           $self->log->error("[process $proc->{pid}]: (handle: $id) Exception in $io\_cb: $@");
230             }
231             } else {
232             # append to buffer
233 0           $self->log->debug("[process $proc->{pid}]: (handle: $id) Appending $len bytes to ".uc($io)." buffer.");
234 0           $proc->{"buf_$io"} .= $chunk;
235             }
236 0           })->watch($proc->{"hdr_$io"}, 1, 0);
237             }
238              
239             sub drop_handle {
240 0     0 0   my $self = shift;
241 0           my $pid = shift;
242 0   0       my $io = lc(shift || '');
243            
244 0           my $proc = $self->get_proc($pid);
245 0 0         return unless $proc;
246            
247 0 0 0       $self->log->debug("[process $pid]: Got HUP for unmanaged handle ".$proc->{"hdr_$io"}."; ignoring.") and return
248             unless $proc->{"hdr_$io"};
249            
250            
251 0           $self->ioloop->remove( $proc->{"hdr_$io"} );
252 0           undef $proc->{"hdr_$io"};
253            
254 0           $self->log->debug("[process $pid]: ".uc($io)." closed.");
255            
256 0           $self->complete($pid);
257             }
258              
259             sub get_proc {
260 0     0 0   my ($self, $pid) = @_;
261            
262 1     1   8 no warnings;
  1         2  
  1         418  
263 0           my $err = "[process $pid]: Unable to get process data structure: ";
264            
265 0 0         unless (defined $pid) {
266 0           $self->error($err . "Undefined pid.");
267 0           return undef;
268             }
269            
270 0 0 0       unless (
271             exists $self->{_data}->{$pid}
272             && defined $self->{_data}->{$pid}
273             ) {
274 0           $self->error($err . "Non-managed process pid: $pid");
275 0           return undef;
276             }
277              
278 0           return $self->{_data}->{$pid};
279             }
280              
281             sub cleanup {
282 0     0 0   my ($self, $pid, $exit_val, $signum, $core) = @_;
283            
284 0           my $proc = $self->get_proc($pid);
285 0 0         unless (defined $proc) {
286 1     1   6 no warnings;
  1         1  
  1         880  
287 0           $self->log->warn("Untracked process pid $pid exited with exit status $exit_val by signal $signum, core: $core.");
288 0           return 0;
289             }
290 0 0         return 0 if $proc->{cleanup};
291            
292 0           $proc->{cleanup} = 1;
293              
294 0 0         $self->log->debug("[process $pid]: Got SIGCHLD, "
295             . "exited with exit status: $exit_val by signal $signum"
296             . (($core) ? "with core dump." : ".")
297             );
298              
299 0 0         if (defined $proc->{id_timeout}) {
300 0           $self->ioloop->remove($proc->{id_timeout});
301 0           $proc->{id_timeout} = undef;
302             }
303 0 0         if ($proc->{hard_kill}) {
304 0           for (qw/stderr stdout stdres/) {
305 0 0         $self->drop_handle($pid, $_) if $proc->{"hdr_$_"};
306             }
307             }
308 0           $proc->{exit_status} = $exit_val;
309 0           $proc->{exit_core } = $core;
310 0           $proc->{exit_signal} = $signum;
311              
312             # command timings...
313 0           my $te = time;
314 0           $proc->{time_stopped } = $te;
315 0           $proc->{time_duration_exec} = $te - $proc->{time_started};
316              
317             # this process is no longer running
318 0           $proc->{running} = 0;
319              
320 0           $self->complete($pid);
321             }
322              
323             sub complete {
324 0     0 0   my ($self, $pid, $force) = @_;
325            
326 0           my $proc = $self->get_proc($pid);
327            
328 0 0 0       return 0 if !$force
      0        
329             && (
330             $proc->{running}
331             || defined $proc->{hdr_stdout}
332             || defined $proc->{hdr_stdres}
333             || defined $proc->{hdr_stderr}
334             );
335            
336              
337 0 0 0       if ($proc && %$proc) {
338 0           $self->log->debug("[process $pid]: All streams closed, process execution complete.");
339            
340 0           $proc->{time_duration_total} = time - $proc->{time_started};
341              
342             # fire exit callback!
343 0 0 0       if (defined $proc->{exit_cb} && ref $proc->{exit_cb} eq 'CODE') {
344 0 0         my $result = eval { $proc->{buf_stdres} ? thaw($proc->{buf_stdres}) : undef};
  0            
345            
346 0 0         if ($@) {
347 0           croak "Error de-serializing subprocess data: $@";
348             }
349            
350             # prepare callback structure
351 0           my $cb_d = {
352             cmd => ref $proc->{cmd} eq 'CODE' ? 'CODE' :
353 0 0         ref $proc->{cmd} eq 'ARRAY' ? join(' ', @{$proc->{cmd}}) :
    0          
    0          
    0          
354             $proc->{cmd}
355             ,
356             param => $proc->{param},
357             exit_status => $proc->{exit_status},
358             exit_signal => $proc->{exit_signal},
359             exit_core => $proc->{exit_core},
360             stdout => $proc->{buf_stdout},
361             stderr => ($proc->{buf_stderr} ? $proc->{buf_stderr} : '').($proc->{stderr} ? $proc->{stderr} : ''),
362             result => $result,
363             time_started => $proc->{time_started},
364             time_stopped => $proc->{time_stopped},
365             time_duration_exec => $proc->{time_duration_exec},
366             time_duration_total => $proc->{time_duration_total},
367             };
368              
369             # safely invoke callback
370 0           $self->log->debug("[process $pid]: invoking exit_cb callback.");
371 0           eval { $proc->{exit_cb}->($pid, $cb_d); };
  0            
372            
373 0 0         $self->log->error("[process $pid]: Error running exit_cb: $@") if $@;
374             } else {
375 0           $self->log->error("[process $pid]: No exit_cb callback!");
376             }
377             }
378              
379 0           delete $self->{_data}->{$pid};
380 0           $self->num_forks($self->num_forks - 1);
381             }
382              
383             sub _sig_chld {
384 0     0     my ($self) = @_;
385              
386 1     1   6 no strict 'subs';
  1         2  
  1         551  
387            
388 0           my $i = 0;
389 0           while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
390 0           $i++;
391 0           my $exit_val = $? >> 8;
392 0           my $signum = $? & 127;
393 0           my $core = $? & 128;
394              
395             # do process cleanup
396 0           $self->cleanup($pid, $exit_val, $signum, $core);
397             }
398            
399 0 0         $self->log->debug("SIGCHLD handler cleaned up after $i process(es).")
400             if $i > 0;
401             }
402              
403             sub _getRunStruct {
404 0     0     my ($self, $opt) = @_;
405            
406 0           my $s = {
407             pid => 0,
408             cmd => undef,
409             param => undef,
410             error => undef,
411             stdout_cb => undef,
412             stderr_cb => undef,
413             exit_cb => undef,
414             exec_timeout => 0,
415             buf_stdout => '',
416             buf_stderr => '',
417             buf_stdres => '',
418             hdr_stdout => undef,
419             hdr_stderr => undef,
420             hdr_stdres => undef,
421             };
422              
423             # apply user defined vars...
424 0           $s->{$_} = $opt->{$_}
425 0           for grep { exists $s->{$_} } keys %$opt;
426              
427 0           return $s;
428             }
429              
430             sub _validateRunStruct {
431 0     0     my ($self, $s) = @_;
432              
433             # command?
434 0 0 0       $self->error('Undefined command.') and return
435             unless defined $s->{cmd};
436            
437             # check command...
438 0           my $cmd_ref = ref $s->{cmd};
439 0 0 0       $self->error('Zero-length command.') and return
      0        
440             if $cmd_ref eq '' && length $s->{cmd} == 0;
441            
442 0 0 0       $self->error('Command can be pure scalar, arrayref or coderef.') and return
      0        
443             if $cmd_ref ne '' && not $cmd_ref ~~ ['CODE', 'ARRAY'];
444              
445             # callbacks...
446 0 0 0       $self->error("STDOUT callback defined, but is not code reference.") and return
      0        
447             if defined $s->{stdout_cb} && ref $s->{stdout_cb} ne 'CODE';
448            
449 0 0 0       $self->error("STDERR callback defined, but is not code reference.") and return
      0        
450             if defined $s->{stderr_cb} && ref $s->{stderr_cb} ne 'CODE';
451            
452 0 0 0       $self->error("Process exit_cb callback defined, but is not code reference.") and return
      0        
453             if defined $s->{exit_cb} && ref($s->{exit_cb}) ne 'CODE';
454              
455             # exec timeout
456 1     1   5 { no warnings; $s->{exec_timeout} += 0; }
  1         2  
  1         659  
  0            
  0            
457              
458 0           return 1;
459             }
460              
461             sub _timeout_cb {
462 0     0     my ($self, $pid) = @_;
463            
464 0           my $proc = $self->get_proc($pid);
465 0 0         return 0 unless $proc;
466            
467             # drop timer (can't hurt...)
468 0 0         if (defined $proc->{id_timeout}) {
469 0           $self->ioloop->remove($proc->{id_timeout});
470 0           $proc->{id_timeout} = undef;
471             }
472              
473             # is process still alive?
474 0 0         return 0 unless kill 0, $pid;
475              
476 0           $self->log->debug("[process $pid]: Execution timeout ("
477             .sprintf("%-.3f seconds).", $proc->{exec_timeout})
478             ." Killing process.");
479              
480 0           $proc->{stderr} .= ";Execution timeout.";
481            
482             # kill the motherfucker!
483              
484 0 0         unless (CORE::kill(9, $pid)) {
485 0           $self->log->warn("[process $pid]: Unable to kill process: $!");
486             }
487 0           $proc->{hard_kill} = 1;
488 0           $self->cleanup($pid, 0, 9, 0);
489              
490 0           return 1;
491             }
492              
493             sub kill {
494 0     0 0   my ($self, $pid, $signal) = @_;
495 0 0         $signal = 15 unless defined $signal;
496            
497 0           my $proc = $self->get_proc($pid);
498 0 0         return 0 unless $proc;
499              
500             # kill the process...
501 0 0         unless (kill($signal, $pid)) {
502 0           $self->error("Unable to send signal $signal to process $pid: $!");
503 0           return 0;
504             }
505            
506 0           return 1;
507             }
508              
509             sub DESTROY {
510 0     0     my ($self) = @_;
511            
512             # perform cleanup...
513 0 0         unless ($self->is_child) {
514 0           foreach my $pid (keys %{$self->{_data}}) {
  0            
515 0           my $proc = $self->{_data}->{$pid};
516            
517 0 0         $self->log->debug("Killing subprocess $pid with SIGKILL") if $self->log;
518             # kill process (HARD!)
519 0           $self->kill($pid, 9);
520            
521 0 0         next unless defined $self->ioloop;
522            
523             # drop fds
524 0           $self->drop_handle($pid, $_) for grep {$proc->{"hdr_$_"}} qw/stdout stderr stdres/;
  0            
525            
526             # fire exit callbacks (if any)
527 0           $self->complete($pid, 1);
528             }
529             }
530              
531             # disable sigchld hander
532 0           $SIG{'CHLD'} = 'IGNORE';
533             }
534              
535             1;
536              
537             =pod
538            
539             =head1 NAME
540              
541             Mojo::Run - asynchronous external command execution for Mojo
542              
543             =head1 VERSION
544              
545             version 0.3
546              
547             =head1 SYNOPSIS
548              
549             use Mojo::Run;
550             use Mojo::Log;
551              
552             my $run = Mojo::Run->new;
553             $run->max_forks(10);
554             $run->log(Mojo::Log->new(
555             level => 'error',
556             path => 'log/mojo_run.log',
557             ));
558              
559             $run->spawn(
560             cmd => sub {
561             my $pid = shift;
562             my $param = shift; # {a => 1, b => 2}
563            
564             my $data = {};
565             ... do something
566             return $data;
567             },
568             param => {a => 1, b => 2},
569             exec_timeout => 120, # sec
570             stdout_cb => sub {
571             my ($pid, $chunk) = @_;
572             },
573             stderr_cb => sub {
574             my ($pid, $chunk) = @_;
575             },
576             exit_cb => sub {
577             my $pid = shift;
578             my $res = shift;
579             warn $res->{result}->[0];
580             },
581             );
582             $run->spawn(
583             cmd => 'ps aux',
584             exit_cb => sub {
585             my $pid = shift;
586             my $res = shift;
587             },
588             );
589             $run->spawn(
590             cmd => ['perl', '-v'],
591             exit_cb => sub {
592             my $pid = shift;
593             my $res = shift;
594             },
595             );
596              
597             $run->start;
598              
599             =head1 Result
600              
601             Result in B< exit_cb > is a HASH with following keys:
602              
603             =over
604            
605             =item B< cmd >
606              
607             =item B< param >
608              
609             =item B< exit_status >
610              
611             =item B< exit_signal >
612              
613             =item B< exit_core >
614              
615             =item B< stdout >
616              
617             =item B< stderr >
618              
619             =item B< result >
620              
621             =item B< time_started >
622              
623             =item B< time_stopped >
624              
625             =item B< time_duration_exec >
626              
627             =item B< time_duration_total >
628              
629             =back
630              
631             =head1 SOURCE REPOSITORY
632              
633             L
634              
635             =head1 AUTHOR
636              
637             Alexey Likhatskiy,
638              
639             =head1 LICENSE AND COPYRIGHT
640              
641             Copyright (C) 2012-2013 "Alexey Likhatskiy"
642              
643             This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.