File Coverage

blib/lib/Mojo/Run3.pm
Criterion Covered Total %
statement 166 186 89.2
branch 61 88 69.3
condition 28 50 56.0
subroutine 36 38 94.7
pod 10 10 100.0
total 301 372 80.9


line stmt bran cond sub pod time code
1             package Mojo::Run3;
2 10     10   2102332 use Mojo::Base 'Mojo::EventEmitter';
  10         129  
  10         59  
3              
4 10     10   16492 use Carp qw(croak);
  10         21  
  10         543  
5 10     10   4213 use Errno qw(EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK EIO);
  10         10811  
  10         983  
6 10     10   69 use IO::Handle;
  10         19  
  10         366  
7 10     10   4410 use IO::Pty;
  10         114960  
  10         524  
8 10     10   4773 use Mojo::IOLoop::ReadWriteFork::SIGCHLD;
  10         26493  
  10         59  
9 10     10   5226 use Mojo::IOLoop;
  10         1602182  
  10         69  
10 10     10   598 use Mojo::Util qw(term_escape);
  10         26  
  10         475  
11 10     10   81 use Mojo::Promise;
  10         147  
  10         60  
12 10     10   304 use POSIX qw(sysconf _SC_OPEN_MAX);
  10         26  
  10         78  
13 10     10   862 use Scalar::Util qw(blessed weaken);
  10         41  
  10         600  
14              
15 10   50 10   61 use constant DEBUG => $ENV{MOJO_RUN3_DEBUG} && 1;
  10         21  
  10         664  
16 10     10   63 use constant MAX_OPEN_FDS => sysconf(_SC_OPEN_MAX);
  10         20  
  10         32861  
17              
18             our $VERSION = '1.01';
19              
20             our @SAFE_SIG
21             = grep { !m!^(NUM\d+|__[A-Z0-9]+__|ALL|CATCHALL|DEFER|HOLD|IGNORE|MAX|PAUSE|RTMAX|RTMIN|SEGV|SETS)$! } keys %SIG;
22              
23             has driver => sub { +{stdin => 'pipe', stdout => 'pipe', stderr => 'pipe'} };
24             has ioloop => sub { Mojo::IOLoop->singleton }, weak => 1;
25              
26             sub bytes_waiting {
27 4     4 1 1338 my ($self, $name) = (@_, 'stdin');
28 4   100     57 return length($self->{buffer}{$name} // '');
29             }
30              
31             sub close {
32 133     133 1 7661 my ($self, $name) = @_;
33 133 100       549 return $self->_close_other if $name eq 'other';
34              
35 132         288 my $fh = $self->{fh};
36 132 100       544 return $self unless my $handle = $fh->{$name};
37              
38 61         293 my $reactor = $self->ioloop->reactor;
39 61         698 $self->_d('close %s (%s)', $name, $fh->{$name} // 'undef') if DEBUG;
40              
41 61         509 for my $sibling (keys %$fh) {
42 140 100 66     1758 $reactor->remove(delete $fh->{$sibling}) if $fh->{$sibling} and $fh->{$sibling} eq $handle;
43             }
44              
45 61         777 $handle->close;
46 61         2193 return $self;
47             }
48              
49 6     6 1 14818 sub exit_status { shift->status >> 8 }
50 6     6 1 273 sub handle { $_[0]->{fh}{$_[1]} }
51              
52             sub kill {
53 17     17 1 3166 my ($self, $signal) = (@_, 15);
54 17         50 $self->_d('kill %s %s', $signal, $self->{pid} // 0) if DEBUG;
55 17 100       933 return $self->{pid} ? kill $signal, $self->{pid} : -1;
56             }
57              
58             sub run_p {
59 23     23 1 46335 my ($self, $cb) = @_;
60 23         142 my $p = Mojo::Promise->new;
61 23     21   1087 $self->once(finish => sub { $p->resolve($_[0]) });
  21         1951  
62 23         669 $self->start($cb);
63 23         208 return $p;
64             }
65              
66 15   100 15 1 10491 sub pid { shift->{pid} // -1 }
67 83   100 83 1 8447 sub status { shift->{status} // -1 }
68              
69             sub start {
70 25     25 1 3674 my ($self, $cb) = @_;
71 25 50   25   168 $self->ioloop->next_tick(sub { $self and $self->_start($cb) });
  25         12234  
72 25         1576 return $self;
73             }
74              
75             sub write {
76 10   66 10 1 19500 my $cb = ref $_[-1] eq 'CODE' && pop;
77 10         71 my ($self, $chunk, $conduit) = (@_, 'stdin');
78 10 100       118 $self->once(drain => $cb) if $cb;
79 10         274 $self->{buffer}{$conduit} .= $chunk;
80 10         71 $self->_write($conduit);
81 10         121 return $self;
82             }
83              
84             sub _cleanup {
85 24     24   232 my ($self, $signal) = @_;
86 24 100       943 return unless $self->{pid};
87 13         132 $self->close($_) for qw(pty stdin stderr stdout);
88 13 50       205 $self->kill($signal) if $signal;
89             }
90              
91             sub _close_other {
92 1     1   8 my ($self) = @_;
93 1 50       13 croak "Cannot close 'other' in parent process!" if $self->pid != 0;
94              
95 0         0 my $fh = delete $self->{fh};
96 0         0 $fh->{$_}->close for keys %$fh;
97              
98 0         0 local $!;
99 0         0 for my $fileno (0 .. MAX_OPEN_FDS - 1) {
100 0 0       0 next if fileno(STDIN) == $fileno;
101 0 0       0 next if fileno(STDOUT) == $fileno;
102 0 0       0 next if fileno(STDERR) == $fileno;
103 0         0 POSIX::close($fileno);
104             }
105              
106 0         0 return $self;
107             }
108              
109             sub _d {
110 0     0   0 my ($self, $format, @val) = @_;
111 0   0     0 warn sprintf "[run3:%s] $format\n", $self->{pid} // 0, @val;
112             }
113              
114             sub _fail {
115 2     2   29 my ($self, $err, $errno) = @_;
116 2         3 $self->_d('finish %s (%s)', $err, $errno) if DEBUG;
117 2         6 $self->{status} = $errno;
118 2         23 $self->emit(error => $err)->emit('finish');
119 2         91 $self->_cleanup;
120             }
121              
122             sub _maybe_finish {
123 66     66   230 my ($self, $reason) = @_;
124 66         283 my $status = $self->status;
125              
126 66         170 if (DEBUG) {
127             $self->_d('finish=%s status=%s stdout=%s stderr=%s pty=%s',
128             $reason, $status, map { ref $self->{fh}{$_} } qw(stdout stderr pty));
129             }
130              
131 66 100       369 return 0 if $status == -1;
132 22 50       102 return 0 if grep { $self->{fh}{$_} } qw(pty stdout stderr);
  66         431  
133              
134 22         143 $self->close('stdin');
135 22         79 for my $cb (@{$self->subscribers('finish')}) {
  22         335  
136 22 50       409 $self->emit(error => $@) unless eval { $self->$cb; 1 };
  22         159  
  22         8024  
137             }
138              
139 22         682 return 1;
140             }
141              
142             sub _read {
143 318     318   9313 my ($self, $name, $handle) = @_;
144              
145 318         1449 my $n_bytes = $handle->sysread(my $buf, 131072, 0);
146 318 100       27371 if ($n_bytes) {
    100          
147 268         405 $self->_d('%s >>> %s (%i)', $name, term_escape($buf) =~ s!\n!\\n!gr, $n_bytes) if DEBUG;
148 268         1232 return $self->emit($name => $buf);
149             }
150             elsif (defined $n_bytes) {
151 36         179 return $self->close($name)->_maybe_finish($name); # EOF
152             }
153             else {
154 14         79 $self->close($name);
155 14         25 $self->_d('op=read conduit=%s errstr="%s" errno=%s', $name, $!, int $!) if DEBUG;
156 14 100 100     349 return undef if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK; # Retry
      66        
157 11 100 100     126 return $self->kill if $! == ECONNRESET || $! == EPIPE; # Error
158 9 100       73 return $self->_maybe_finish($name) if $! == EIO; # EOF on PTY raises EIO
159 1         8 return $self->emit(error => $!);
160             }
161             }
162              
163             sub _redirect {
164 6     6   102 my ($self, $conduit, $real, $virtual) = @_;
165 6 100 50     74 return $real->close || die "Couldn't close $conduit: $!" unless $virtual;
166 5         204 $real->autoflush(1);
167 5   50     852 return open($real, ($conduit eq 'stdin' ? '<&=' : '>&='), fileno($virtual)) || die "Couldn't dup $conduit: $!";
168             }
169              
170             sub _start {
171 25     25   86 my ($self, $cb) = @_;
172              
173 25         161 my $options = $self->driver;
174 25 100       186 $options = {stdin => $options, stdout => 'pipe', stderr => 'pipe'} unless ref $options;
175              
176             # Prepare IPC filehandles
177 25         71 my ($pty, %child, %parent);
178 25         71 for my $conduit (qw(pty stdin stdout stderr)) {
179 98   100     400 my $driver = $options->{$conduit} // 'close';
180 98 100       364 if ($driver eq 'pty') {
    100          
181 11   66     199 $pty ||= IO::Pty->new;
182 11         7348 ($child{$conduit}, $parent{$conduit}) = ($pty->slave, $pty);
183             }
184             elsif ($driver eq 'pipe') {
185 60 100       2120 pipe my $read, my $write or return $self->_fail("Can't create pipe: $!", $!);
186 59 100       467 ($child{$conduit}, $parent{$conduit}) = $conduit eq 'stdin' ? ($read, $write) : ($write, $read);
187             }
188              
189 97         336 $self->_d('conduit=%s child=%s parent=%s', $conduit, $child{$conduit} // '', $parent{$conduit} // '') if DEBUG;
190             }
191              
192             # Child
193 24 100       41184 unless ($self->{pid} = fork) {
194 3 100       278 return $self->_fail("Can't fork: $!", $!) unless defined $self->{pid};
195 2         79 $self->{fh} = \%child;
196 2 50 50     179 $pty->make_slave_controlling_terminal if $pty and ($options->{make_slave_controlling_terminal} // 1);
      66        
197 2         1209 $_->close for values %parent;
198              
199 2         193 $self->_redirect(stdin => \*STDIN, $child{stdin});
200 2         42 $self->_redirect(stdout => \*STDOUT, $child{stdout});
201 2         47 $self->_redirect(stderr => \*STDERR, $child{stderr});
202              
203 2         978 @SIG{@SAFE_SIG} = ('DEFAULT') x @SAFE_SIG;
204 2         69 ($@, $!) = ('', 0);
205              
206 2         35 eval { $self->$cb };
  2         64  
207 0 0 0     0 my ($err, $errno) = ($@, $@ ? 255 : $! || 0);
208 0 0       0 print STDERR $err if length $err;
209 0 0       0 POSIX::_exit($errno) || exit $errno;
210             }
211              
212             # Parent
213 21         1289 $self->{fh} = \%parent;
214 21 100       850 ($pty->close_slave, ($self->{fh}{pty} = $pty)) if $pty;
215 21         2818 $_->close for values %child;
216              
217 21         2129 weaken $self;
218 21         756 my $reactor = $self->ioloop->reactor;
219 21         1034 my %uniq;
220 21         357 for my $conduit (qw(pty stdout stderr)) {
221 63 100       670 next unless my $fh = $parent{$conduit};
222 49 100       962 next if $uniq{$fh}++;
223 44 50   310   2371 $reactor->io($fh, sub { $self ? $self->_read($conduit => $fh) : $_[0]->remove($fh) });
  310         42511882  
224 44         3168 $reactor->watch($fh, 1, 0);
225             }
226              
227 21         228 $self->_d('waitpid %s', $self->{pid}) if DEBUG;
228             Mojo::IOLoop::ReadWriteFork::SIGCHLD->singleton->waitpid(
229             $self->{pid} => sub {
230 21 50   21   708199 return unless $self;
231 21         427 $self->{status} = $_[0];
232 21         167 $self->_maybe_finish('waitpid');
233             }
234 21         1156 );
235              
236 21         7119 $self->emit('spawn');
237 21         1126 $self->_write($_) for qw(pty stdin);
238             }
239              
240             sub _write {
241 52     52   346 my ($self, $conduit) = @_;
242 52 100       2013 return unless length $self->{buffer}{$conduit};
243 16 100       140 return unless my $fh = $self->{fh}{$conduit};
244              
245 8         315 my $n_bytes = $fh->syswrite($self->{buffer}{$conduit});
246 8 50       400 if (defined $n_bytes) {
247 8         107 my $buf = substr $self->{buffer}{$conduit}, 0, $n_bytes, '';
248 8         38 $self->_d('%s <<< %s (%i)', $conduit, term_escape($buf) =~ s!\n!\\n!gr, length $buf) if DEBUG;
249 8 50       149 return $self->emit('drain') unless length $self->{buffer}{$conduit};
250 0     0   0 return $self->ioloop->next_tick(sub { $self->_write });
  0         0  
251             }
252             else {
253 0         0 $self->_d('op=write conduit=%s errstr="%s" errno=%s', $conduit, $!, $!) if DEBUG;
254 0 0 0     0 return if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
      0        
255 0 0 0     0 return $self->kill(9) if $! == ECONNRESET || $! == EPIPE;
256 0         0 return $self->emit(error => $!);
257             }
258             }
259              
260 22 50   22   130601 sub DESTROY { shift->_cleanup(9) unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }
261              
262             1;
263              
264             =encoding utf8
265              
266             =head1 NAME
267              
268             Mojo::Run3 - Run a subprocess and read/write to it
269              
270             =head1 SYNOPSIS
271              
272             use Mojo::Base -strict, -signatures;
273             use Mojo::Run3;
274              
275             This example gets "stdout" events when the "ls" command emits output:
276              
277             use IO::Handle;
278             my $run3 = Mojo::Run3->new;
279             $run3->on(stdout => sub ($run3, $bytes) {
280             STDOUT->syswrite($bytes);
281             });
282              
283             $run3->run_p(sub { exec qw(/usr/bin/ls -l /tmp) })->wait;
284              
285             This example does the same, but on a remote host using ssh:
286              
287             my $run3 = Mojo::Run3->new
288             ->driver({pty => 'pty', stdin => 'pipe', stdout => 'pipe', stderr => 'pipe'});
289              
290             $run3->once(pty => sub ($run3, $bytes) {
291             $run3->write("my-secret-password\n", "pty") if $bytes =~ /password:/;
292             });
293              
294             $run3->on(stdout => sub ($run3, $bytes) {
295             STDOUT->syswrite($bytes);
296             });
297              
298             $run3->run_p(sub { exec qw(ssh example.com ls -l /tmp) })->wait;
299              
300             =head1 DESCRIPTION
301              
302             L allows you to fork a subprocess which you can write STDIN to, and
303             read STDERR and STDOUT without blocking the the event loop.
304              
305             This module also supports L which allows you to create a
306             pseudoterminal for the child process. This is especially useful for application
307             such as C and L.
308              
309             This module is currently EXPERIMENTAL, but unlikely to change much.
310              
311             =head1 EVENTS
312              
313             =head2 drain
314              
315             $run3->on(drain => sub ($run3) { });
316              
317             Emitted after L has written the whole buffer to the subprocess.
318              
319             =head2 error
320              
321             $run3->on(error => sub ($run3, $str) { });
322              
323             Emitted when something goes wrong.
324              
325             =head2 finish
326              
327             $run3->on(finish => sub ($run3, @) { });
328              
329             Emitted when the subprocess has ended. L might be emitted before
330             L, but L will always be emitted at some point after L
331             as long as the subprocess actually stops. L will contain C<$!> if the
332             subprocess could not be started or the exit code from the subprocess.
333              
334             =head2 pty
335              
336             $run3->on(pty => sub ($run3, $bytes) { });
337              
338             Emitted when the subprocess write bytes to L. See L for more
339             details.
340              
341             =head2 stderr
342              
343             $run3->on(stderr => sub ($run3, $bytes) { });
344              
345             Emitted when the subprocess write bytes to STDERR.
346              
347             =head2 stdout
348              
349             $run3->on(stdout => sub ($run3, $bytes) { });
350              
351             Emitted when the subprocess write bytes to STDOUT.
352              
353             =head2 spawn
354              
355             $run3->on(spawn => sub ($run3, @) { });
356              
357             Emitted in the parent process after the subprocess has been forked.
358              
359             =head1 ATTRIBUTES
360              
361             =head2 driver
362              
363             $hash_ref = $run3->driver;
364             $run3 = $self->driver({stdin => 'pipe', stdout => 'pipe', stderr => 'pipe'});
365              
366             Used to set the driver for "pty", "stdin", "stdout" and "stderr".
367              
368             Examples:
369              
370             # Open pipe for STDIN and STDOUT and close STDERR in child process
371             $self->driver({stdin => 'pipe', stdout => 'pipe'});
372              
373             # Create a PTY and attach STDIN to it and open a pipe for STDOUT and STDERR
374             $self->driver({stdin => 'pty', stdout => 'pipe', stderr => 'pipe'});
375              
376             # Create a PTY and pipes for STDIN, STDOUT and STDERR
377             $self->driver({pty => 'pty', stdin => 'pipe', stdout => 'pipe', stderr => 'pipe'});
378              
379             # Create a PTY, but do not make the PTY slave the controlling terminal
380             $self->driver({pty => 'pty', stdout => 'pipe', make_slave_controlling_terminal => 0});
381              
382             # It is not supported to set "pty" to "pipe"
383             $self->driver({pty => 'pipe'});
384              
385             =head2 ioloop
386              
387             $ioloop = $run3->ioloop;
388             $run3 = $run3->ioloop(Mojo::IOLoop->singleton);
389              
390             Holds a L object.
391              
392             =head1 METHODS
393              
394             =head2 bytes_waiting
395              
396             $int = $run3->bytes_waiting;
397              
398             Returns how many bytes has been passed on to L buffer, but not yet
399             written to the child process.
400              
401             =head2 close
402              
403             $run3 = $run3->close('other');
404             $run3 = $run3->close('stdin');
405              
406             Can be used to close C or other filehandles that are not in use in a sub
407             process.
408              
409             Closing "stdin" is useful after piping data into a process like C.
410              
411             Here is an example of closing "other":
412              
413             $run3->start(sub ($run3, @) {
414             $run3->close('other');
415             exec telnet => '127.0.0.1';
416             });
417              
418             Closing "other" is currently EXPERIMENTAL and might be changed later on, but it
419             is unlikely it will get removed.
420              
421             =head2 exit_status
422              
423             $int = $run3->exit_status;
424              
425             Returns the exit status part of L, which will should be a number from
426             0 to 255.
427              
428             =head2 handle
429              
430             $fh = $run3->handle($name);
431              
432             Returns a file handle or undef for C<$name>, which can be "stdin", "stdout",
433             "stderr" or "pty". This method returns the write or read "end" of the file
434             handle depending if it is called from the parent or child process.
435              
436             =head2 kill
437              
438             $int = $run3->kill($signal);
439              
440             Used to send a C<$signal> to the subprocess. Returns C<-1> if no process
441             exists, C<0> if the process could not be signalled and C<1> if the signal was
442             successfully sent.
443              
444             =head2 pid
445              
446             $int = $run3->pid;
447              
448             Process ID of the child after L has successfully started. The PID will
449             be "0" in the child process and "-1" before the child process was started.
450              
451             =head2 run_p
452              
453             $p = $run3->run_p(sub ($run3) { ... })->then(sub ($run3) { ... });
454              
455             Will L the subprocess and the promise will be fulfilled when L
456             is emitted.
457              
458             =head2 start
459              
460             $run3 = $run3->start(sub ($run3, @) { ... });
461              
462             Will start the subprocess. The code block passed in will be run in the child
463             process. C can be used if you want to run another program. Example:
464              
465             $run3 = $run3->start(sub { exec @my_other_program_with_args });
466             $run3 = $run3->start(sub { exec qw(/usr/bin/ls -l /tmp) });
467              
468             =head2 status
469              
470             $int = $run3->status;
471              
472             Holds the exit status of the program or C<$!> if the program failed to start.
473             The value includes signals and coredump flags. L can be used
474             instead to get the exit value from 0 to 255.
475              
476             =head2 write
477              
478             $run3 = $run3->write($bytes);
479             $run3 = $run3->write($bytes, sub ($run3) { ... });
480             $run3 = $run3->write($bytes, $conduit);
481             $run3 = $run3->write($bytes, $conduit, sub ($run3) { ... });
482              
483             Used to write C<$bytes> to the subprocess. C<$conduit> can be "pty" or "stdin",
484             and defaults to "stdin". The optional callback will be called on the next
485             L event.
486              
487             =head1 AUTHOR
488              
489             Jan Henning Thorsen
490              
491             =head1 COPYRIGHT AND LICENSE
492              
493             This program is free software, you can redistribute it and/or modify it under
494             the terms of the Artistic License version 2.0.
495              
496             =head1 SEE ALSO
497              
498             L,
499             L, L.
500              
501             =cut