File Coverage

blib/lib/MojoX/Run.pm
Criterion Covered Total %
statement 9 9 100.0
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 12 12 100.0


line stmt bran cond sub pod time code
1             package MojoX::Run;
2              
3 1     1   35684 use strict;
  1         2  
  1         41  
4 1     1   5 use warnings;
  1         2  
  1         32  
5              
6 1     1   5 use base 'Mojo::Base';
  1         5  
  1         772  
7              
8             use bytes;
9             use Time::HiRes qw(time);
10             use POSIX qw(:sys_wait_h);
11             use Scalar::Util qw(blessed);
12              
13             use Storable qw(thaw freeze);
14              
15             use Mojo::Log;
16             use Mojo::IOLoop;
17              
18             use MojoX::_Open3;
19             use MojoX::HandleRun;
20              
21             # timeout in seconds (~10 years)
22             use constant VERY_LONG_TIMEOUT => 60 * 60 * 24 * 365 * 10;
23              
24             # private logging object...
25             my $_log = Mojo::Log->new();
26              
27             # singleton object instance
28             my $_obj = undef;
29              
30             our $VERSION = '0.15';
31              
32             =head1 NAME
33              
34             MojoX::Run - asynchronous external command and subroutine execution for Mojo
35              
36             =head1 SYNOPSIS
37              
38             # create async executor SINGLETON object
39             my $mojox_run = MojoX::Run->singleton();
40            
41             # simple usage
42             my $pid = $mojox_run->spawn(
43             cmd => "ping -W 2 -c 5 host.example.org",
44             exit_cb => sub {
45             my ($pid, $res) = @_;
46             print "Ping finished with exit status $res->{exit_val}.\n";
47             print "\tSTDOUT:\n$res->{stdout}\n";
48             print "\tSTDERR:\n$res->{stderr}\n";
49             }
50             );
51             # check for injuries
52             unless ($pid) {
53             print "Command startup failed: ", $mojox_run->error(), "\n";
54             }
55            
56             # more complex example...
57             my $pid2 = $mojox_run->spawn(
58             cmd => 'ping -W 2 -c 5 host.example.org',
59             stdin_cb => sub {
60             my ($pid, $chunk) = @_;
61             print "STDOUT $pid: '$chunk'\n"
62             },
63             # ignore stderr
64             stderr_cb => sub {},
65             exit_cb => sub {
66             my ($pid, $res) = @_;
67             print "Process $res->{cmd} [pid: $pid] finished after $res->{time_duration_exec} second(s).\n";
68             print "Exit status: $res->{exit_status}";
69             print " by signal $res->{exit_signal}" if ($res->{exit_signal});
70             print " with coredump." if ($res->{exit_core});
71             print "\n";
72             }
73             );
74            
75             # even fancier usage: spawn coderef
76             my $pid3 = $mojox_run->spawn(
77             cmd => sub {
78             for (my $i = 0; $i < 10; $i++) {
79             if (rand() > 0.5) {
80             print STDERR rand(), "\n"
81             } else {
82             print rand(), "\n";
83             }
84             sleep int(rand(10));
85             }
86             exit (rand() > 0.5) ? 0 : 1;
87             },
88             exit_cb => {
89             print "Sub exited with $res->{exit_status}, STDOUT: $res->{stdout}\n";
90             },
91             );
92              
93             =head1 SIGCHLD WARNING
94              
95             Object instance of this class takes over B signal handler. You have been
96             warned!
97              
98             =head1 OBJECT CONSTRUCTOR
99              
100             =head2 new
101              
102             Alias for L method - object constructor always returns the same
103             object instance.
104              
105             This restriction is enforced becouse there can be only one active B
106             signal handler per process. However this shouldn't be a problem becouse
107             you can run multiple external processes simultaneously with MojoX::Run :)
108              
109             =cut
110             sub new {
111             return __PACKAGE__->singleton();
112             }
113              
114             =head2 singleton
115              
116             my $mojox_run = MojoX::Run->singleton();
117              
118             Returns singleton object instance of MojoX::Run. Singleton object uses Mojo's
119             L singleton instance. This is probably what you want instead of
120             creating your own private instance.
121              
122             =cut
123             sub singleton {
124             # return existing instance if available
125             return $_obj if (defined $_obj);
126            
127             # no singleton object? create one
128             $_obj = __PACKAGE__->_constructor();
129             return $_obj;
130             }
131              
132             # the real constructor
133             sub _constructor {
134             my $proto = shift;
135             my $class = ref($proto) || $proto;
136             my $self = $class->SUPER::new();
137              
138             bless($self, $class);
139             $self->_init();
140            
141             # do we have any arguments?
142             # argument can only be ioloop object...
143             $self->ioloop(@_) if (@_);
144            
145             return $self;
146             }
147              
148             sub DESTROY {
149             my ($self) = @_;
150            
151             # perform cleanup...
152             foreach my $pid (keys %{$self->{_data}}) {
153             my $proc = $self->{_data}->{$pid};
154              
155             # kill process (HARD!)
156             kill(9, $pid);
157             $_log->debug("Killing subprocess $pid with SIGKILL") if (defined $_log);
158              
159             my $loop = $self->ioloop();
160             next unless (defined $loop);
161              
162             # drop fds
163             if (defined $proc->{id_stdout}) {
164             $loop->drop($proc->{id_stdout});
165             }
166             if (defined $proc->{id_stderr}) {
167             $loop->drop($proc->{id_stderr});
168             }
169             if (defined $proc->{id_stdin}) {
170             $loop->drop($proc->{id_stdin});
171             }
172              
173             # fire exit callbacks (if any)
174             $self->_checkIfComplete($pid, 1);
175              
176             # remove struct
177             delete($self->{_data}->{$pid});
178             }
179              
180             # disable sigchld hander
181             $SIG{'CHLD'} = 'IGNORE';
182             }
183              
184             ##################################################
185             # PUBLIC METHODS #
186             ##################################################
187              
188             =head1 METHODS
189              
190             =head2 error
191              
192             my $err = $mojox_run->error();
193              
194             Returns last error.
195              
196             =cut
197              
198             sub error {
199             my ($self) = @_;
200             return $self->{_error};
201             }
202              
203             =head2 spawn
204              
205             my $pid = $mojox_run->spawn(%opt);
206              
207             Spawns new subprocess. The following options are supported:
208              
209             =over
210              
211             =item B (string/arrayref/coderef, undef, B):
212              
213             Command to be started. Command can be simple scalar, array reference or perl CODE reference
214             if you want to custom perl subroutine asynchronously.
215              
216             =item B (coderef, undef):
217              
218             Code that will be invoked when data were read from processes's stdout. If omitted, stdout output
219             will be returned as argument to B. Example:
220              
221             stdout_cb => sub {
222             my ($pid, $data) = @_;
223             print "Process $pid stdout: $data";
224             }
225              
226             =item B (coderef, undef):
227              
228             Code that will be invoked when data were read from processes's stderr. If omitted, stderr output
229             will be returned as argument to B. Example:
230              
231             stderr_cb => sub {
232             my ($pid, $data) = @_;
233             print "Process $pid stderr: $data";
234             }
235              
236             =item B (coderef, undef):
237              
238             Code that will be invoked when data wrote to process's stdin were flushed. Example:
239              
240             stdin_cb => sub {
241             my ($pid) = @_;
242             print "Process $pid: stdin was flushed.";
243             }
244              
245             =item B (coderef, undef, B)
246              
247             Code to be invoked after process exits and all handles have been flushed. Function is called
248             with 2 arguments: Process identifier (pid) and result structure. Example:
249              
250             exit_cb => sub {
251             my ($pid, $res) = @_;
252             print "Process $pid exited\n";
253             print "Execution error: $res->{error}\n" if (defined $res->{error});
254             print "Exit status: $pid->{exit_status}\n";
255             print "Killed by signal $pid->{exit_signal}\n" if ($res->{exit_signal});
256             print "Process dumped core.\n" if (res->{exit_core});
257             print "Process was started at: $res->{time_started}\n";
258             print "Process exited at $res->{time_stopped}\n";
259             print "Process execution duration: $res->{time_duration_exec}\n";
260             print "Execution duration: $res->{time_duration_total}\n";
261             print "Process stdout: $res->{stdout}\n";
262             print "Process stderr: $res->{stderr}\n";
263             }
264              
265             =item B (float, 0):
266              
267             If set to positive non-zero value, process will be killed after specified timeout of seconds. Timeout accuracy
268             depends on IOLoop's timeout() value (Default is 0.25 seconds).
269              
270             =back
271              
272             Returns non-zero process identifier (pid) on success, otherwise 0 and sets error.
273              
274             =cut
275              
276             sub spawn {
277             my ($self, %opt) = @_;
278             unless (defined $self && blessed($self) && $self->isa(__PACKAGE__)) {
279             my $obj = __PACKAGE__->new();
280             return $obj->spawn(%opt);
281             }
282             $self->{_error} = '';
283              
284             # normalize and validate run parameters...
285             my $o = $self->_getRunStruct(\%opt);
286             return 0 unless ($self->_validateRunStruct($o));
287              
288             # start exec!
289             return $self->_spawn($o);
290             }
291              
292             =head2 spawn_sub
293              
294             my $code = sub { return { a => 1, b => 2} };
295             my $pid = $mojox_run->spawn_sub(
296             $code,
297             exit_cb => sub {
298             my ($pid, $result, $exception) = @_;
299             }
300             );
301              
302             Spawns new subprocess in which $code subroutine will be executed. Return value of
303             subroutine will be delivered to B callback.
304              
305             The following options are supported:
306              
307             =over
308              
309             =item B (coderef, undef):
310              
311             Code that will be invoked when data wrote to process's stdin were flushed. Example:
312              
313             stdin_cb => sub {
314             my ($pid) = @_;
315             print "Process $pid: stdin was flushed.";
316             }
317              
318             =item B (coderef, undef, B)
319              
320             Code to be invoked after process exits and all handles have been flushed. Function is called
321             with 2 arguments: Process identifier (pid) and result structure. Example:
322              
323             exit_cb => sub {
324             my ($pid, $result, $exception) = @_;
325             if ($exception) {
326             print "Horrible exception accoured while executing subroutine: $exception";
327             return;
328             }
329            
330             # result is always arrayref, becouse subs can return list values!
331             print "Got async sub result: ", Dumper($result), "\n";
332             }
333              
334             =item B (float, 0):
335              
336             If set to positive non-zero value, process will be killed after specified timeout of seconds. Timeout accuracy
337             depends on IOLoop's timeout() value.
338              
339             =back
340              
341             Returns non-zero process identifier (pid) on success, otherwise 0 and sets error.
342              
343             =cut
344             sub spawn_sub {
345             my ($self, $sub, %opt) = @_;
346             unless (defined $sub && ref($sub) eq 'CODE') {
347             $self->{_error} = "First argument must be coderef.";
348             return 0;
349             }
350             my $exit_cb = delete($opt{exit_cb});
351             unless (defined $exit_cb && ref($exit_cb)) {
352             $self->{_error} = "No exit_cb defined!";
353             return 0;
354             }
355            
356             # remove stupid stuff from %opt
357             delete($opt{stdout_cb});
358             delete($opt{stderr_cb});
359            
360             # wrap sub to our custom routine
361             my $code = sub {
362             # run sub...
363             local $@;
364             my @rv = eval { $sub->() };
365            
366             # exception?
367             if ($@) {
368             print STDERR "Exception: $@";
369             CORE::exit(1);
370             }
371            
372             # we have a result!
373             print freeze(\ @rv);
374             CORE::exit(0);
375             };
376            
377             # wrap exit_cb to our routine
378             my $exit_code = sub {
379             my ($pid, $res) = @_;
380             my $ref = undef;
381             my $ex = undef;
382             # everything ok?
383             if ($res->{exit_status} == 0) {
384             local $@;
385             # try to de-serialize data...
386             $ref = eval { thaw($res->{stdout}) };
387             # check for injuries...
388             if ($@) {
389             $ex = "Error de-serializing subprocess data: $@";
390             }
391             } else {
392             $ex = $res->{stderr};
393             }
394            
395             # run exit cb...
396             $exit_cb->($pid, $ref, $ex);
397             };
398            
399             # spawn the goddamn sub
400             my $p = $self->spawn(
401             cmd => $code,
402             %opt,
403             exit_cb => $exit_code,
404             );
405            
406             return 0 unless ($p);
407              
408             # lock down stdout/err streams...
409             $self->_lock_output($p);
410            
411             return $p;
412             }
413              
414             =head2 stdin_write
415              
416             $mojox_run->stdin_write($pid, $data [, $cb]);
417              
418             Writes $data to stdin of process $pid if process still has opened stdin. If $cb is defined
419             code reference it will invoke it when data has been written. If $cb is omitted B
420             will be invoked if is set for process $pid.
421              
422             Returns 1 on success, otherwise 0 and sets error.
423              
424             =cut
425              
426             sub stdin_write {
427             my ($self, $pid, $data, $cb) = @_;
428             my $proc = $self->_getProcStruct($pid);
429             unless (defined $pid && defined $proc) {
430             $self->{_error} =
431             "Unable to write to process pid '$pid' stdin: Unamanaged process pid or process stdin is already closed.";
432             return 0;
433             }
434              
435             # is stdin still opened?
436             unless (defined $proc->{id_stdin}) {
437             $self->{_error} = "STDIN handle is already closed.";
438             return 0;
439             }
440              
441             # do we have custom callback?
442             if (defined $cb) {
443             unless (ref($cb) eq 'CODE') {
444             $self->{_error} =
445             "Optional second argument must be code reference.";
446             return 0;
447             }
448             }
449             else {
450              
451             # do we have stdin callback?
452             if (defined $proc->{stdin_cb} && ref($proc->{stdin_cb}) eq 'CODE') {
453             $cb = $proc->{stdin_cb};
454             }
455             }
456              
457             # write data
458             $self->ioloop()->write($proc->{id_stdin}, $data, $cb);
459             return 1;
460             }
461              
462             =head2 stdout_cb
463              
464             # set
465             $mojox_run->stdout_cb($pid, $cb);
466             # get
467             my $cb = $mojox_run->stdout_cb($pid);
468              
469             If called without $cb argument returns stdout callback for process $pid, otherwise
470             sets stdout callback. If $cb is undefined, removes callback.
471              
472             Returns undef on error and sets error message.
473              
474             =cut
475              
476             sub stdout_cb {
477             my ($self, $pid, $cb) = @_;
478             return $self->__handle_cb($pid, 'stdout', $cb);
479             }
480              
481             =head2 stderr_cb
482              
483             # set
484             $mojox_run->stderr_cb($pid, $cb);
485             # get
486             $cb = $mojox_run->stderr_cb($pid);
487              
488             If called without $cb argument returns stderr callback for process $pid, otherwise
489             sets stderr callback. If $cb is undefined, removes callback.
490              
491             Returns undef on error and sets error message.
492              
493             =cut
494              
495             sub stderr_cb {
496             my ($self, $pid, $cb) = @_;
497             return $self->__handle_cb($pid, 'stderr', $cb);
498             }
499              
500             =head2 stdin_cb
501              
502             # set
503             $mojox_run->stdin_cb($pid, $cb);
504             # get
505             $mojox_run->stdin_cb($pid);
506              
507             If called without $cb argument returns stdin callback for process $pid, otherwise
508             sets stdin callback. If $cb is undefined, removes callback.
509              
510             Returns undef on error and sets error message.
511              
512             =cut
513              
514             sub stdin_cb {
515             my ($self, $pid, $cb) = @_;
516             return $self->__handle_cb($pid, 'stdin', $cb);
517             }
518              
519             =head2 stdin_close
520              
521             $mojox_run->stdin_close($pid);
522              
523             Closes stdin handle to specified process. You need to explicitly close stdin
524             if spawned program doesn't exit until it's stdin is not closed.
525              
526             =cut
527              
528             sub stdin_close {
529             my ($self, $pid) = @_;
530             my $proc = $self->_getProcStruct($pid);
531             return 0 unless (defined $proc);
532              
533             # is stdin opened?
534             my $id_stdin = $proc->{id_stdin};
535             unless (defined $id_stdin) {
536             $self->{_error} = "STDIN is already closed.";
537             return 0;
538             }
539              
540             my $loop = $self->ioloop();
541             unless (defined $loop) {
542             $self->{_error} = "Undefined IOLoop.";
543             return 0;
544             }
545              
546             # drop handle...
547             $loop->drop($id_stdin);
548             $proc->{id_stdin} = undef;
549              
550             return 1;
551             }
552              
553             =head2 stdout_buf
554              
555             # just get it
556             $buf = $mojox_run->stdout_buf($pid);
557             # get and drain
558             $buf = $mojox_run->stdout_buf($pid, 1);
559              
560             Returns contents of stdout buffer for process $pid on success, otherwise undef.
561              
562             Internal buffer is cleared if invoked with non-zero second argument.
563              
564             =cut
565              
566             sub stdout_buf {
567             my ($self, $pid, $clear) = @_;
568             $clear = 0 unless (defined $clear);
569             my $proc = $self->_getProcStruct($pid);
570             return undef unless (defined $proc);
571             return undef if ($proc->{out_locked});
572              
573             # clear buffer?
574             $proc->{buf_stdout} = '' if ($clear);
575             return $proc->{buf_stdout};
576             }
577              
578             =head2 stdout_buf_clear
579              
580             $buf = $mojox_run->stdout_buf_clear($pid);
581              
582             Clears stdout buffer for process $pid. Returns string containing buffer contents on success, otherwise undef.
583              
584             =cut
585              
586             sub stdout_buf_clear {
587             return shift->stdout_buf($_[0], 1);
588             }
589              
590             =head2 stderr_buf
591              
592             # just get it
593             $buf = $mojox_run->stderr_buf($pid);
594             # get and drain
595             $buf = $mojox_run->stderr_buf($pid, 1);
596              
597             Returns contents of stderr buffer for process $pid on success, otherwise undef.
598              
599             Internal buffer is cleared if invoked with non-zero second argument.
600              
601             =cut
602              
603             sub stderr_buf {
604             my ($self, $pid, $clear) = @_;
605             $clear = 0 unless (defined $clear);
606             my $proc = $self->_getProcStruct($pid);
607             return undef unless (defined $proc);
608             return undef if ($proc->{out_locked});
609              
610             # clear buffer?
611             $proc->{buf_stderr} = '' if ($clear);
612             return $proc->{buf_stderr};
613             }
614              
615             =head2 stderr_buf_clear
616              
617             $buf = $mojox_run->stderr_buf_clear($pid);
618              
619             Clears stderr buffer for process $pid. Returns empty string on success, otherwise undef.
620              
621             =cut
622              
623             sub stderr_buf_clear {
624             return shift->stderr_buf($_[0], 1);
625             }
626              
627             =head2 kill
628              
629             $mojox_run->kill($pid [, $signal = 15]);
630              
631             Kills process $pid with specified signal. Returns 1 on success, otherwise 0.
632              
633             =cut
634              
635             sub kill {
636             my ($self, $pid, $signal) = @_;
637             $signal = 15 unless (defined $signal);
638             my $proc = $self->_getProcStruct($pid);
639             return 0 unless (defined $proc);
640              
641             # kill the process...
642             unless (kill($signal, $pid)) {
643             $self->{_error} = "Unable to send signal $signal to process $pid: $!";
644             return 0;
645             }
646             return 1;
647             }
648              
649             =head2 log_level ([$level])
650              
651             Gets or sets loglevel for private logger instance. See L for additional instructions.
652              
653             =cut
654              
655             sub log_level {
656             my ($self, $level) = @_;
657             if (defined $level) {
658             my $prev_level = $_log->level();
659             $_log->level($level);
660             }
661             return $_log->level();
662             }
663              
664             =head2 num_running
665              
666             Returns number of currently managed sub-processes.
667              
668             =cut
669             sub num_running {
670             my ($self) = @_;
671             return scalar(keys %{$self->{_data}});
672             }
673              
674             =head2 max_running
675              
676             $mojox_run->max_running($limit);
677              
678             Returns currently set concurrently running subprocesses limit if called without arguments.
679             If called with integer argument sets new limit of concurrently spawned external processes
680             and returns old limit.
681              
682             Value of 0 means that there is no limit.
683              
684             =cut
685             sub max_running {
686             my $self = shift;
687             # used provided argument?
688             if (@_) {
689             my $limit = shift;
690             # invalid limit?
691             return $self->{_max_running} unless (defined $limit);
692             { no warnings; $limit += 0; }
693             my $old_limit = $self->{_max_running};
694            
695             # issue warning about overflow...
696             if ($limit > 0 && $limit <= $self->num_running()) {
697             $_log->warn(
698             "New limit of $limit concurrently managed subprocesses is lower " .
699             "than current number of managed subprocesses (" .
700             $self->num_running() .
701             "); new process creation will be refused until one or more " .
702             " currently managed subprocesses won't exit."
703             );
704             }
705            
706             # set new limit
707             $self->{_max_running} = ($limit > 0) ? $limit : 0;
708              
709             # return old limit
710             return $old_limit;
711             } else {
712             return $self->{_max_running};
713             }
714             }
715              
716             =head2 ioloop
717              
718             # get
719             $loop = $mojox_run->ioloop();
720             # set
721             $mojox_run->ioloop($loop);
722              
723             Returns currently used ioloop if called without arguments. Currently
724             used IO loop if changed invoked with initialized L argument -
725             you better be sure what you're doing!
726              
727             =cut
728              
729             sub ioloop {
730             my ($self, $loop) = @_;
731             # no valid $loop argument?
732             unless (defined $loop && blessed($loop) && $loop->isa('Mojo::IOLoop')) {
733             # custom loop?
734             return $self->{_loop} if (defined $self->{_loop});
735             # return singleton loop
736             return Mojo::IOLoop->singleton();
737             }
738            
739             # assign custom ioloop
740             $self->{_loop} = $loop;
741             return $self->{_loop};
742             }
743              
744             ##################################################
745             # PRIVATE METHODS #
746             ##################################################
747              
748             sub __handle_cb {
749             my $self = shift;
750             my $pid = shift;
751             my $name = shift;
752              
753             $self->{_error} = '';
754              
755             my $proc = $self->_getProcStruct($pid);
756             return undef unless (defined $proc);
757              
758             my $key = $name . '_cb';
759             unless (exists($proc->{$key})) {
760             $self->{_error} = "Invalid callback name: $name";
761             return undef;
762             }
763              
764             # save old callback
765             my $old_cb = $proc->{$key};
766             $self->{_error} = "Handle $name: no callback defined." unless (defined $old_cb);
767              
768             # should we set another callback?
769             if (@_) {
770             my $new_cb = shift;
771             unless (ref($new_cb) eq 'CODE') {
772             $self->{_error} = "Second argument must be code reference.";
773             return undef;
774             }
775             if ($proc->{out_locked} && ($name eq 'stdout' || $name eq 'stderr')) {
776             $self->{_error} = "Process was started by spawn_sub. Ouput streams are locked.";
777             return undef;
778             }
779              
780             # apply callback
781             $proc->{$key} = $new_cb;
782             }
783              
784             # return it...
785             return $old_cb;
786             }
787              
788             sub _spawn {
789             my ($self, $o) = @_;
790             unless (defined $o && ref($o) eq 'HASH') {
791             $self->{_error} =
792             "Invalid spawning options. THIS IS A " . __PACKAGE__ . ' BUG!!!';
793             return 0;
794             }
795            
796             # can we spawn another subprocess?
797             if ($self->max_running() > 0) {
798             if ($self->num_running() >= $self->max_running()) {
799             $self->{_error} = "Unable to spawn another subprocess: " .
800             "Limit of " . $self->num_running() . " concurrently spawned process(es) is reached.";
801             return 0;
802             }
803             }
804              
805             # time to do the job
806             $_log->debug("Spawning command "
807             . "[timeout: "
808             . ($o->{exec_timeout} > 0) ? sprintf("%-.3f seconds]", $o->{exec_timeout}) : "none"
809             . ": $o->{cmd}");
810              
811             # prepare stdio handles
812             my $stdin = MojoX::HandleRun->new();
813             my $stdout = MojoX::HandleRun->new();
814             my $stderr = MojoX::HandleRun->new();
815              
816             # prepare spawn structure
817             my $proc = {
818             out_locked => 0,
819             time_started => time(),
820             pid => 0,
821             cmd => $o->{cmd},
822             running => 1,
823             error => undef,
824             stdin_cb => ($o->{stdin_cb}) ? $o->{stdin_cb} : undef,
825             stdout_cb => ($o->{stdout_cb}) ? $o->{stdout_cb} : undef,
826             stderr_cb => ($o->{stderr_cb}) ? $o->{stderr_cb} : undef,
827             exit_cb => ($o->{exit_cb}) ? $o->{exit_cb} : undef,
828             timeout => $o->{exec_timeout},
829             buf_stdout => '',
830             buf_stderr => '',
831             id_stdin => undef,
832             id_stdout => undef,
833             id_stderr => undef,
834             id_timeout => undef,
835             };
836              
837             # spawn command
838             my $pid = undef;
839             eval { $pid = MojoX::_Open3::open3($stdin, $stdout, $stderr, $o->{cmd}) };
840             if ($@) {
841             $self->{_error} = "Exception while starting command '$o->{cmd}': $@";
842             return 0;
843             }
844             unless (defined $pid && $pid > 0) {
845             $self->{_error} = "Error starting external command: $!";
846             return 0;
847             }
848             $_log->debug("Subprocess spawned as pid $pid.");
849             $proc->{pid} = $pid;
850              
851             # make handles non-blocking...
852             $stdin->blocking(0);
853             $stdout->blocking(0);
854             $stderr->blocking(0);
855              
856             my $loop = $self->ioloop();
857              
858             # exec timeout
859             if (defined $o->{exec_timeout} && $o->{exec_timeout} > 0) {
860             $_log->debug(
861             "[process $pid]: Setting execution timeout to " .
862             sprintf("%-.3f seconds.", $o->{exec_timeout})
863             );
864             my $timer = $loop->timer(
865             $o->{exec_timeout},
866             sub { _timeout_cb($self, $pid) }
867             );
868              
869             # save timer
870             $proc->{id_timeout} = $timer;
871             }
872              
873             # add them to ioloop
874             my $id_stdout = $loop->connect(
875             socket => $stdout,
876             handle => $stdout,
877             on_error => sub { _error_cb($self, $pid, @_) },
878             on_hup => sub { _hup_cb($self, $pid, @_) },
879             on_read => sub { _read_cb($self, $pid, @_) },
880             );
881             my $id_stderr = $loop->connect(
882             socket => $stderr,
883             handle => $stderr,
884             on_error => sub { _error_cb($self, $pid, @_) },
885             on_hup => sub { _hup_cb($self, $pid, @_) },
886             on_read => sub { _read_cb($self, $pid, @_) },
887             );
888             my $id_stdin = $loop->connect(
889             socket => $stdin,
890             handle => $stdin,
891             on_error => sub { _error_cb($self, $pid, @_) },
892             on_hup => sub { _hup_cb($self, $pid, @_) },
893             on_read => sub { _read_cb($self, $pid, @_) },
894             );
895            
896             {
897             no warnings;
898             $_log->debug("[process $pid]: handles: stdin=$id_stdin, stdout=$id_stdout, stderr=$id_stderr");
899             }
900            
901             unless (defined $id_stdout && defined $id_stderr && defined $id_stdin) {
902             $self->{_error} = "Didn't get all handles from IOLoop. This is extremely weird, spawned process was killed.";
903             CORE::kill(9, $pid);
904             return 0;
905             }
906              
907             # STDIO FD timeouts
908             my $io_timeout = $o->{exec_timeout};
909             # no timeout at all? set insanely large value...
910             unless (defined $io_timeout && $io_timeout > 0) {
911             # i guess that there are no perl processes
912             # that live for 10 years...
913             $io_timeout = VERY_LONG_TIMEOUT;
914             }
915             # I/O timeout should be for at least one io loop's
916             # tick longer than execution timeout so that command
917             # closes streams itself, otherwise streams can be
918             # closed by ioloop which would result in incomplete
919             # output capture.
920             $io_timeout++;
921            
922             # apply stdio timeouts
923             $loop->connection_timeout($id_stdout, $io_timeout);
924             $loop->connection_timeout($id_stderr, $io_timeout);
925             $loop->connection_timeout($id_stdin, $io_timeout);
926             $_log->debug("[process $pid]: stdio stream timeout set to $io_timeout seconds.");
927              
928             # save loop fd ids
929             $proc->{id_stdin} = $id_stdin;
930             $proc->{id_stdout} = $id_stdout;
931             $proc->{id_stderr} = $id_stderr;
932              
933             # save structure...
934             $self->{_data}->{$pid} = $proc;
935              
936             return $pid;
937             }
938              
939             sub _lock_output {
940             my ($self, $pid) = @_;
941              
942             # get process struct...
943             my $proc = $self->_getProcStruct($pid);
944             return 0 unless (defined $proc);
945              
946             $proc->{out_locked} = 1;
947             return 1;
948             }
949              
950             sub _read_cb {
951             my ($self, $pid, $loop, $id, $chunk) = @_;
952             my $len = 0;
953             $len = length($chunk) if (defined $chunk);
954              
955             # no data?
956             return 0 unless ($len > 0);
957              
958             # get process struct...
959             my $proc = $self->_getProcStruct($pid);
960             return 0 unless (defined $proc);
961              
962             # id can be stdout or stderr (stdin is write-only)
963             if (defined $proc->{id_stdout} && $proc->{id_stdout} eq $id) {
964              
965             # do we have callback?
966             if (defined $proc->{stdout_cb}) {
967             $_log->debug("[process $pid]: (handle: $id) Invoking STDOUT callback.");
968             eval { $proc->{stdout_cb}->($pid, $chunk) };
969             if ($@) {
970             $_log->error("[process $pid]: (handle: $id) Exception in stdout_cb: $@");
971             }
972             }
973             else {
974              
975             # append to buffer
976             $_log->debug(
977             "[process $pid]: (handle: $id) Appending $len bytes to STDOUT buffer.");
978             $proc->{buf_stdout} .= $chunk;
979             }
980             }
981             elsif (defined $proc->{id_stderr} && $proc->{id_stderr} eq $id) {
982              
983             # do we have callback?
984             if (defined $proc->{stderr_cb}) {
985             $_log->debug("[process $pid]: (handle: $id) Invoking STDERR callback.");
986             eval { $proc->{stderr_cb}->($pid, $chunk) };
987             if ($@) {
988             $_log->error("[process $pid]: (handle: $id) Exception in stderr_cb: $@");
989             }
990             }
991             else {
992              
993             # append to buffer
994             $_log->debug(
995             "[process $pid]: (handle: $id) Appending $len bytes to STDERR buffer.");
996             $proc->{buf_stderr} .= $chunk;
997             }
998             }
999             else {
1000             $_log->debug("Got data from unmanaged handle $id; ignoring.");
1001             return 0;
1002             }
1003             }
1004              
1005             sub _hup_cb {
1006             my ($self, $pid, $loop, $id) = @_;
1007             # just drop the goddamn handle...
1008             return $self->_dropHandle($pid, $loop, $id);
1009             }
1010              
1011             sub _dropHandle {
1012             my ($self, $pid, $loop, $id) = @_;
1013              
1014             # get process structure
1015             my $proc = $self->_getProcStruct($pid);
1016             return 0 unless (defined $proc);
1017              
1018             if (defined $proc->{id_stdout} && $proc->{id_stdout} eq $id) {
1019             $proc->{id_stdout} = undef;
1020             $_log->debug("[process $pid]: STDOUT closed.");
1021             }
1022             elsif (defined $proc->{id_stderr} && $proc->{id_stderr} eq $id) {
1023             $proc->{id_stderr} = undef;
1024             $_log->debug("[process $pid]: STDERR closed.");
1025             }
1026             elsif (defined $proc->{id_stdin} && $proc->{id_stdin} eq $id) {
1027             $proc->{id_stdin} = undef;
1028             $_log->debug("[process $pid]: STDIN closed.");
1029             }
1030             else {
1031             $_log->debug("[process $pid]: Got HUP for unmanaged handle $id; ignoring.");
1032             return 0;
1033             }
1034              
1035             # drop handle...
1036             $loop->drop($id);
1037              
1038             # check if we're ready to deliver response
1039             $self->_checkIfComplete($pid);
1040             }
1041              
1042             sub _checkIfComplete {
1043             my ($self, $pid, $force) = @_;
1044             $force = 0 unless (defined $force);
1045              
1046             # get process structure
1047             my $proc = $self->_getProcStruct($pid);
1048             return 0 unless (defined $proc);
1049              
1050             # is process execution really complete?
1051             # a) it can be forced
1052             # b) all streams should be closed && sigchld must for pid
1053             if ($force
1054             || (
1055             !$proc->{running}
1056             && !defined $proc->{id_stdin}
1057             && !defined $proc->{id_stdout}
1058             && !defined $proc->{id_stderr}
1059             )
1060             )
1061             {
1062             $_log->debug(
1063             "[process $pid]: All streams closed, process execution complete.")
1064             unless ($force);
1065             $proc->{time_duration_total} = time() - $proc->{time_started};
1066              
1067             # fire exit callback!
1068             if (defined $proc->{exit_cb} && ref($proc->{exit_cb}) eq 'CODE') {
1069              
1070             # prepare callback structure
1071             my $cb_d = {
1072             cmd => (ref($proc->{cmd}) eq 'CODE') ?
1073             'CODE' :
1074             (
1075             (ref($proc->{cmd}) eq 'ARRAY') ?
1076             join(' ', @{$proc->{cmd}}) :
1077             $proc->{cmd}
1078             ),
1079             exit_status => $proc->{exit_val},
1080             exit_signal => $proc->{exit_signal},
1081             exit_core => $proc->{exit_core},
1082             error => ($force) ? "Forced process termination." : $proc->{error},
1083             stdout => $proc->{buf_stdout},
1084             stderr => $proc->{buf_stderr},
1085             time_started => $proc->{time_started},
1086             time_stopped => $proc->{time_stopped},
1087             time_duration_exec => $proc->{time_duration_exec},
1088             time_duration_total => $proc->{time_duration_total},
1089             };
1090              
1091             # safely invoke callback
1092             $_log->debug("[process $pid]: invoking exit_cb callback.") if (defined $_log);
1093             eval { $proc->{exit_cb}->($pid, $cb_d); };
1094             if ($@) {
1095             $_log->error("[process $pid]: Error running exit_cb: $@") if (defined $_log);
1096             }
1097             }
1098             else {
1099             $_log->error("[process $pid]: No exit_cb callback!");
1100             }
1101              
1102             # destroy process structure
1103             $self->_destroyProcStruct($pid);
1104             }
1105             }
1106              
1107             sub _destroyProcStruct {
1108             my ($self, $pid) = @_;
1109             delete($self->{_data}->{$pid});
1110             }
1111              
1112             sub _error_cb {
1113             my ($self, $pid, $loop, $id, $err) = @_;
1114             $_log->debug("[process $pid]: Error on handle $id: $err");
1115             return $self->_dropHandle($pid, $loop, $id);
1116             }
1117              
1118             sub _timeout_cb {
1119             my ($self, $pid) = @_;
1120             my $proc = $self->_getProcStruct($pid);
1121             return 0 unless (defined $proc);
1122              
1123             # drop timer (can't hurt...)
1124             if (defined $proc->{id_timeout}) {
1125             $self->ioloop()->drop($proc->{id_timeout});
1126             $proc->{id_timeout} = undef;
1127             }
1128              
1129             # is process still alive?
1130             return 0 unless (CORE::kill(0, $pid));
1131              
1132             $_log->debug("[process $pid]: Execution timeout ("
1133             . sprintf("%-.3f seconds).", $proc->{timeout})
1134             . " Killing process.");
1135              
1136             # kill the motherfucker!
1137             unless (CORE::kill(9, $pid)) {
1138             $_log->warn("[process $pid]: Unable to kill process: $!");
1139             }
1140              
1141             $proc->{error} = "Execution timeout.";
1142              
1143             # sigchld handler will do the rest for us...
1144             return 1;
1145             }
1146              
1147             sub _init {
1148             my $self = shift;
1149              
1150             # last error message
1151             $self->{_error} = '';
1152              
1153             # stored exec structs
1154             $self->{_data} = {};
1155            
1156             # ioloop object...
1157             $self->{_ioloop} = undef;
1158            
1159             # maximum running limit
1160             $self->{_max_running} = 0;
1161              
1162             # install SIGCHLD handler
1163             $SIG{'CHLD'} = sub { _sig_chld($self, @_) };
1164             }
1165              
1166             sub _getProcStruct {
1167             my ($self, $pid) = @_;
1168             no warnings;
1169             my $err = "[process $pid]: Unable to get process data structure: ";
1170             unless (defined $pid) {
1171             $self->{_error} = $err . "Undefined pid.";
1172             return undef;
1173             }
1174             unless (exists($self->{_data}->{$pid})
1175             && defined $self->{_data}->{$pid})
1176             {
1177             $self->{_error} = $err . "Non-managed process pid: $pid";
1178             return undef;
1179             }
1180              
1181             return $self->{_data}->{$pid};
1182             }
1183              
1184             sub _getRunStruct {
1185             my ($self, $opt) = @_;
1186             my $s = {
1187             cmd => undef,
1188             stdout_cb => undef,
1189             stderr_cb => undef,
1190             error_cb => undef,
1191             exit_cb => undef,
1192             exec_timeout => 0,
1193             };
1194              
1195             # apply user defined vars...
1196             map {
1197             if (exists($s->{$_}))
1198             {
1199             $s->{$_} = $opt->{$_};
1200             }
1201             } keys %{$opt};
1202              
1203             return $s;
1204             }
1205              
1206             sub _validateRunStruct {
1207             my ($self, $s) = @_;
1208              
1209             # command?
1210             unless (defined $s->{cmd}) { #} && length($s->{cmd}) > 0) {
1211             $self->{_error} = "Undefined command.";
1212             return 0;
1213             }
1214             # check command...
1215             my $cmd_ref = ref($s->{cmd});
1216             if ($cmd_ref eq '') {
1217             unless (length($s->{cmd}) > 0) {
1218             $self->{_error} = "Zero-length command.";
1219             return 0;
1220             }
1221             } else {
1222             unless ($cmd_ref eq 'CODE' || $cmd_ref eq 'ARRAY') {
1223             $self->{_error} = "Command can be pure scalar, arrayref or coderef.";
1224             return 0;
1225             }
1226             }
1227              
1228             # callbacks...
1229             if (defined $s->{stdout_cb} && ref($s->{stdout_cb}) ne 'CODE') {
1230             $self->{_error} = "STDOUT callback defined, but is not code reference.";
1231             return 0;
1232             }
1233             if (defined $s->{stderr_cb} && ref($s->{stderr_cb}) ne 'CODE') {
1234             $self->{_error} = "STDERR callback defined, but is not code reference.";
1235             return 0;
1236             }
1237             if (defined $s->{exit_cb} && ref($s->{exit_cb}) ne 'CODE') {
1238             $self->{_error} =
1239             "Process exit_cb callback defined, but is not code reference.";
1240             return 0;
1241             }
1242              
1243             # exec timeout
1244             { no warnings; $s->{exec_timeout} += 0; }
1245              
1246             return 1;
1247             }
1248              
1249             sub _procCleanup {
1250             my ($self, $pid, $exit_val, $signum, $core) = @_;
1251             my $proc = $self->_getProcStruct($pid);
1252             unless (defined $proc) {
1253             no warnings;
1254             $_log->warn(
1255             "Untracked process pid $pid exited with exit status $exit_val by signal $signum, core: $core."
1256             );
1257             return 0;
1258             }
1259              
1260             $_log->debug(
1261             "[process $pid]: Got SIGCHLD, " .
1262             "exited with exit status: $exit_val by signal $signum"
1263             . (($core) ? "with core dump" : "")
1264             . '.');
1265              
1266             $proc->{exit_val} = $exit_val;
1267             $proc->{exit_signal} = $signum;
1268             $proc->{exit_core} = $core;
1269              
1270             # command timings...
1271             my $te = time();
1272             $proc->{time_stopped} = $te;
1273             $proc->{time_duration_exec} = $te - $proc->{time_started};
1274              
1275             # this process is no longer running
1276             $proc->{running} = 0;
1277              
1278             # destroy timer if it was defined
1279             if (defined $proc->{id_timeout}) {
1280             $_log->debug(
1281             "[process $pid]: Removing timeout handler $proc->{id_timeout}.");
1282             $self->ioloop()->drop($proc->{id_timeout});
1283             $proc->{id_timeout} = undef;
1284             }
1285              
1286             # check if we're ready to deliver response
1287             $self->_checkIfComplete($pid);
1288             }
1289              
1290             sub _sig_chld {
1291             my ($self) = @_;
1292              
1293             # $_log->debug('SIGCHLD hander startup: ' . join(", ", @_));
1294             my $i = 0;
1295             while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
1296             $i++;
1297             my $exit_val = $? >> 8;
1298             my $signum = $? & 127;
1299             my $core = $? & 128;
1300              
1301             # do process cleanup
1302             $self->_procCleanup($pid, $exit_val, $signum, $core);
1303             }
1304             $_log->debug("SIGCHLD handler cleaned up after $i process(es).")
1305             if ($i > 0);
1306             }
1307              
1308             =head1 BUGS/CAVEATS
1309              
1310             There seem to be problems on some B systems
1311             in conjunction with L implementation. Error manifests itself with the
1312             following warning message:
1313              
1314             Filehandle GEN3 opened only for input at /usr/libdata/perl5/i386-openbsd/5.10.1/IO/Handle.pm line 465.
1315              
1316             L's syswrite method is called by L's _write, but there is no good reason
1317             to write to process stdout or stderr... I'm investigating, feel free to contact me regarding this
1318             issue.
1319              
1320             =head1 AUTHOR
1321              
1322             "Brane F. Gracnar", C<< <"bfg at frost.ath.cx"> >>
1323              
1324             =head1 BUGS
1325              
1326             Please report any bugs or feature requests to C, or through
1327             the web interface at L. I will be notified, and then you'll
1328             automatically be notified of progress on your bug as I make changes.
1329              
1330             =head1 SUPPORT
1331              
1332             You can find documentation for this module with the perldoc command.
1333              
1334             perldoc MojoX::Run
1335              
1336              
1337             You can also look for information at:
1338              
1339             =over 4
1340              
1341             =item * RT: CPAN's request tracker
1342              
1343             L
1344              
1345             =item * AnnoCPAN: Annotated CPAN documentation
1346              
1347             L
1348              
1349             =item * CPAN Ratings
1350              
1351             L
1352              
1353             =item * Search CPAN
1354              
1355             L
1356              
1357             =item * Source repository
1358              
1359             L
1360              
1361             =back
1362              
1363              
1364             =head1 ACKNOWLEDGEMENTS
1365              
1366             This module was inspired by L by Rocco Caputo; module includes
1367             patched version of L from Perl distribution which allows perl coderef
1368             execution.
1369              
1370             =head1 LICENSE AND COPYRIGHT
1371              
1372             Copyright 2010-2011, Brane F. Gracnar.
1373              
1374             This program is free software; you can redistribute it and/or modify it
1375             under the terms of either: the GNU General Public License as published
1376             by the Free Software Foundation; or the Artistic License.
1377              
1378             See http://dev.perl.org/licenses/ for more information.
1379              
1380              
1381             =cut
1382              
1383             1; # End of MojoX::Run