File Coverage

blib/lib/Mojo/Run3.pm
Criterion Covered Total %
statement 174 195 89.2
branch 62 88 70.4
condition 28 50 56.0
subroutine 37 39 94.8
pod 10 10 100.0
total 311 382 81.4


line stmt bran cond sub pod time code
1             package Mojo::Run3;
2 10     10   2108817 use Mojo::Base 'Mojo::EventEmitter';
  10         125  
  10         60  
3              
4 10     10   16437 use Carp qw(croak);
  10         20  
  10         525  
5 10     10   3986 use Errno qw(EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK EIO);
  10         11000  
  10         1017  
6 10     10   71 use IO::Handle;
  10         20  
  10         355  
7 10     10   4447 use IO::Pty;
  10         115032  
  10         531  
8 10     10   4825 use Mojo::IOLoop::ReadWriteFork::SIGCHLD;
  10         26420  
  10         64  
9 10     10   5666 use Mojo::IOLoop;
  10         1608601  
  10         61  
10 10     10   587 use Mojo::Util qw(term_escape);
  10         27  
  10         475  
11 10     10   72 use Mojo::Promise;
  10         136  
  10         66  
12 10     10   424 use POSIX qw(sysconf _SC_OPEN_MAX);
  10         24  
  10         84  
13 10     10   915 use Scalar::Util qw(blessed weaken);
  10         20  
  10         643  
14              
15 10   50 10   63 use constant DEBUG => $ENV{MOJO_RUN3_DEBUG} && 1;
  10         20  
  10         656  
16 10     10   64 use constant MAX_OPEN_FDS => sysconf(_SC_OPEN_MAX);
  10         19  
  10         34231  
17              
18             our $VERSION = '1.02';
19              
20             our @SAFE_SIG
21             = grep { !m!^(NUM\d+|__[A-Z0-9]+__|ALL|CATCHALL|DEFER|HOLD|IGNORE|MAX|PAUSE|RTMAX|RTMIN|SEGV|SETS)$! } keys %SIG;
22              
23             has driver => sub { +{stdin => 'pipe', stdout => 'pipe', stderr => 'pipe'} };
24             has ioloop => sub { Mojo::IOLoop->singleton }, weak => 1;
25              
26             sub bytes_waiting {
27 4     4 1 1248 my ($self, $name) = (@_, 'stdin');
28 4   100     43 return length($self->{buffer}{$name} // '');
29             }
30              
31             sub close {
32 131     131 1 7556 my ($self, $conduit) = @_;
33 131 100       416 return $self->_close_other if $conduit eq 'other';
34              
35 130         252 my $fh = $self->{fh};
36 130 100       427 return $self unless my $handle = $fh->{$conduit};
37              
38 68         141 $self->_d('close %s (%s)', $conduit, $fh->{$conduit} // 'undef') if DEBUG;
39 68         369 $self->_remove($handle, 1);
40 68         429 $handle->close;
41 68         2651 return $self;
42             }
43              
44 6     6 1 14477 sub exit_status { shift->status >> 8 }
45 6     6 1 238 sub handle { $_[0]->{fh}{$_[1]} }
46              
47             sub kill {
48 17     17 1 2874 my ($self, $signal) = (@_, 15);
49 17         44 $self->_d('kill %s %s', $signal, $self->{pid} // 0) if DEBUG;
50 17 100       939 return $self->{pid} ? kill $signal, $self->{pid} : -1;
51             }
52              
53             sub run_p {
54 24     24 1 48631 my ($self, $cb) = @_;
55 24         159 my $p = Mojo::Promise->new;
56 24     22   1066 $self->once(finish => sub { $p->resolve($_[0]) });
  22         1653  
57 24         710 $self->start($cb);
58 24         241 return $p;
59             }
60              
61 15   100 15 1 7153 sub pid { shift->{pid} // -1 }
62 17   100 17 1 8180 sub status { shift->{status} // -1 }
63              
64             sub start {
65 26     26 1 3488 my ($self, $cb) = @_;
66 26 50   26   103 $self->ioloop->next_tick(sub { $self and $self->_start($cb) });
  26         12239  
67 26         1600 return $self;
68             }
69              
70             sub write {
71 10   66 10 1 20920 my $cb = ref $_[-1] eq 'CODE' && pop;
72 10         74 my ($self, $chunk, $conduit) = (@_, 'stdin');
73 10 100       64 $self->once(drain => $cb) if $cb;
74 10         290 $self->{buffer}{$conduit} .= $chunk;
75 10         71 $self->_write($conduit);
76 10         116 return $self;
77             }
78              
79             sub _cleanup {
80 24     24   89 my ($self, $signal) = @_;
81 24 100       708 return unless $self->{pid};
82 13         96 $self->close($_) for qw(pty stdin stderr stdout);
83 13 50       148 $self->kill($signal) if $signal;
84             }
85              
86             sub _close_from_child {
87 71     71   262 my ($self, $conduit) = @_;
88 71         198 delete $self->{watching}{$conduit}; # $conduit can also be "pid"
89 71         144 $self->_d('closed=%s watching="%s"', $conduit, join ' ', sort keys %{$self->{watching}}) if DEBUG;
90 71 100       129 return 0 if keys %{$self->{watching}};
  71         409  
91              
92 23         83 $self->close($_) for keys %{$self->{fh}};
  23         215  
93 23         90 for my $cb (@{$self->subscribers('finish')}) {
  23         338  
94 23 50       358 $self->emit(error => $@) unless eval { $self->$cb; 1 };
  23         147  
  23         7145  
95             }
96              
97 23         775 return 1;
98             }
99              
100             sub _close_other {
101 1     1   8 my ($self) = @_;
102 1 50       16 croak "Cannot close 'other' in parent process!" if $self->pid != 0;
103              
104 0         0 my $fh = delete $self->{fh};
105 0         0 $fh->{$_}->close for keys %$fh;
106              
107 0         0 local $!;
108 0         0 for my $fileno (0 .. MAX_OPEN_FDS - 1) {
109 0 0       0 next if fileno(STDIN) == $fileno;
110 0 0       0 next if fileno(STDOUT) == $fileno;
111 0 0       0 next if fileno(STDERR) == $fileno;
112 0         0 POSIX::close($fileno);
113             }
114              
115 0         0 return $self;
116             }
117              
118             sub _d {
119 0     0   0 my ($self, $format, @val) = @_;
120 0         0 local $!; # Do not reset $! in ex _read()
121 0   0     0 warn sprintf "[run3:%s] $format\n", $self->{pid} // 0, @val;
122             }
123              
124             sub _fail {
125 2     2   29 my ($self, $err, $errno) = @_;
126 2         4 $self->_d('finish %s (%s)', $err, $errno) if DEBUG;
127 2         5 $self->{status} = $errno;
128 2         10 $self->emit(error => $err)->emit('finish');
129 2         57 $self->_cleanup;
130             }
131              
132             sub _read {
133 327     327   10042 my ($self, $name, $handle) = @_;
134              
135 327         1731 my $n_bytes = $handle->sysread(my $buf, 131072, 0);
136 327 100       27195 if ($n_bytes) {
    100          
137 271         400 $self->_d('%s >>> %s (%i)', $name, term_escape($buf) =~ s!\n!\\n!gr, $n_bytes) if DEBUG;
138 271         1144 return $self->emit($name => $buf);
139             }
140             elsif (defined $n_bytes) {
141 39         252 return $self->_remove($handle, 0)->_close_from_child($name); # EOF
142             }
143             else {
144 17         21 $self->_d('op=read conduit=%s errstr="%s" errno=%s', $name, $!, int $!) if DEBUG;
145 17 100 100     434 return undef if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK; # Retry
      66        
146 13 100 100     141 return $self->kill if $! == ECONNRESET || $! == EPIPE; # Error
147 11 100       91 return $self->_remove($handle, 0)->_close_from_child($name) if $! == EIO; # EOF on PTY raises EIO
148 2         12 return $self->emit(error => $!);
149             }
150             }
151              
152             sub _redirect {
153 6     6   101 my ($self, $conduit, $real, $virtual) = @_;
154 6 100 50     89 return $real->close || die "Couldn't close $conduit: $!" unless $virtual;
155 5         295 $real->autoflush(1);
156 5   50     1166 return open($real, ($conduit eq 'stdin' ? '<&=' : '>&='), fileno($virtual)) || die "Couldn't dup $conduit: $!";
157             }
158              
159             sub _remove {
160 116     116   368 my ($self, $handle, $delete) = @_;
161 116         309 my $fh = $self->{fh};
162 116         447 my $reactor = $self->ioloop->reactor;
163              
164 116         1542 for my $name (keys %$fh) {
165 304 100 66     1657 next unless $fh->{$name} and $fh->{$name} eq $handle;
166 135         581 $reactor->remove($fh->{$name});
167 135 100       1656 delete $fh->{$name} if $delete;
168 135         391 delete $self->{watching}{$name};
169             }
170              
171 116         443 return $self;
172             }
173              
174             sub _start {
175 26     26   94 my ($self, $cb) = @_;
176              
177 26         101 my $options = $self->driver;
178 26 100       153 $options = {stdin => $options, stdout => 'pipe', stderr => 'pipe'} unless ref $options;
179              
180             # Prepare IPC filehandles
181 26         63 my ($pty, %child, %parent);
182 26         82 for my $conduit (qw(pty stdin stdout stderr)) {
183 102   100     361 my $driver = $options->{$conduit} // 'close';
184 102 100       368 if ($driver eq 'pty') {
    100          
185 12   66     245 $pty ||= IO::Pty->new;
186 12         7638 ($child{$conduit}, $parent{$conduit}) = ($pty->slave, $pty);
187             }
188             elsif ($driver eq 'pipe') {
189 62 100       2280 pipe my $read, my $write or return $self->_fail("Can't create pipe: $!", $!);
190 61 100       548 ($child{$conduit}, $parent{$conduit}) = $conduit eq 'stdin' ? ($read, $write) : ($write, $read);
191             }
192              
193 101         433 $self->_d('conduit=%s child=%s parent=%s', $conduit, $child{$conduit} // '', $parent{$conduit} // '') if DEBUG;
194             }
195              
196             # Child
197 25 100       43907 unless ($self->{pid} = fork) {
198 3 100       336 return $self->_fail("Can't fork: $!", $!) unless defined $self->{pid};
199 2         90 $self->{fh} = \%child;
200 2 50 50     222 $pty->make_slave_controlling_terminal if $pty and ($options->{make_slave_controlling_terminal} // 1);
      66        
201 2         1096 $_->close for values %parent;
202              
203 2         286 $self->_redirect(stdin => \*STDIN, $child{stdin});
204 2         180 $self->_redirect(stdout => \*STDOUT, $child{stdout});
205 2         52 $self->_redirect(stderr => \*STDERR, $child{stderr});
206              
207 2         977 @SIG{@SAFE_SIG} = ('DEFAULT') x @SAFE_SIG;
208 2         76 ($@, $!) = ('', 0);
209              
210 2         30 eval { $self->$cb };
  2         70  
211 0 0 0     0 my ($err, $errno) = ($@, $@ ? 255 : $! || 0);
212 0 0       0 print STDERR $err if length $err;
213 0 0       0 POSIX::_exit($errno) || exit $errno;
214             }
215              
216             # Parent
217 22         952 $self->{fh} = \%parent;
218 22 100       958 ($pty->close_slave, ($self->{fh}{pty} = $pty)) if $pty;
219 22         3068 $_->close for values %child;
220              
221 22         2317 weaken $self;
222 22         936 my $reactor = $self->ioloop->reactor;
223 22         1401 my %uniq;
224 22         295 for my $conduit (qw(pty stdout stderr)) {
225 66 100       506 next unless my $fh = $parent{$conduit};
226 52 100       1998 next if $uniq{$fh}++;
227 47 50   316   2075 $reactor->io($fh, sub { $self ? $self->_read($conduit => $fh) : $_[0]->remove($fh) });
  316         41494766  
228 47         3226 $reactor->watch($fh, 1, 0);
229 47         1081 $self->{watching}{$conduit} = 1;
230             }
231              
232 22         92 $self->_d('waitpid %s', $self->{pid}) if DEBUG;
233 22         143 $self->{watching}{pid} = 1;
234             Mojo::IOLoop::ReadWriteFork::SIGCHLD->singleton->waitpid(
235             $self->{pid} => sub {
236 22 50   22   700988 return unless $self;
237 22         519 $self->{status} = $_[0];
238 22         251 $self->_close_from_child('pid');
239             }
240 22         1305 );
241              
242 22         7442 $self->emit('spawn');
243 22         1271 $self->_write($_) for qw(pty stdin);
244             }
245              
246             sub _write {
247 54     54   315 my ($self, $conduit) = @_;
248 54 100       1901 return unless length $self->{buffer}{$conduit};
249 16 100       74 return unless my $fh = $self->{fh}{$conduit};
250              
251 8         294 my $n_bytes = $fh->syswrite($self->{buffer}{$conduit});
252 8 50       362 if (defined $n_bytes) {
253 8         84 my $buf = substr $self->{buffer}{$conduit}, 0, $n_bytes, '';
254 8         23 $self->_d('%s <<< %s (%i)', $conduit, term_escape($buf) =~ s!\n!\\n!gr, length $buf) if DEBUG;
255 8 50       148 return $self->emit('drain') unless length $self->{buffer}{$conduit};
256 0     0   0 return $self->ioloop->next_tick(sub { $self->_write });
  0         0  
257             }
258             else {
259 0         0 $self->_d('op=write conduit=%s errstr="%s" errno=%s', $conduit, $!, $!) if DEBUG;
260 0 0 0     0 return if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
      0        
261 0 0 0     0 return $self->kill(9) if $! == ECONNRESET || $! == EPIPE;
262 0         0 return $self->emit(error => $!);
263             }
264             }
265              
266 22 50   22   123636 sub DESTROY { shift->_cleanup(9) unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }
267              
268             1;
269              
270             =encoding utf8
271              
272             =head1 NAME
273              
274             Mojo::Run3 - Run a subprocess and read/write to it
275              
276             =head1 SYNOPSIS
277              
278             use Mojo::Base -strict, -signatures;
279             use Mojo::Run3;
280              
281             This example gets "stdout" events when the "ls" command emits output:
282              
283             use IO::Handle;
284             my $run3 = Mojo::Run3->new;
285             $run3->on(stdout => sub ($run3, $bytes) {
286             STDOUT->syswrite($bytes);
287             });
288              
289             $run3->run_p(sub { exec qw(/usr/bin/ls -l /tmp) })->wait;
290              
291             This example does the same, but on a remote host using ssh:
292              
293             my $run3 = Mojo::Run3->new
294             ->driver({pty => 'pty', stdin => 'pipe', stdout => 'pipe', stderr => 'pipe'});
295              
296             $run3->once(pty => sub ($run3, $bytes) {
297             $run3->write("my-secret-password\n", "pty") if $bytes =~ /password:/;
298             });
299              
300             $run3->on(stdout => sub ($run3, $bytes) {
301             STDOUT->syswrite($bytes);
302             });
303              
304             $run3->run_p(sub { exec qw(ssh example.com ls -l /tmp) })->wait;
305              
306             =head1 DESCRIPTION
307              
308             L allows you to fork a subprocess which you can write STDIN to, and
309             read STDERR and STDOUT without blocking the the event loop.
310              
311             This module also supports L which allows you to create a
312             pseudoterminal for the child process. This is especially useful for application
313             such as C and L.
314              
315             This module is currently EXPERIMENTAL, but unlikely to change much.
316              
317             =head1 EVENTS
318              
319             =head2 drain
320              
321             $run3->on(drain => sub ($run3) { });
322              
323             Emitted after L has written the whole buffer to the subprocess.
324              
325             =head2 error
326              
327             $run3->on(error => sub ($run3, $str) { });
328              
329             Emitted when something goes wrong.
330              
331             =head2 finish
332              
333             $run3->on(finish => sub ($run3, @) { });
334              
335             Emitted when the subprocess has ended. L might be emitted before
336             L, but L will always be emitted at some point after L
337             as long as the subprocess actually stops. L will contain C<$!> if the
338             subprocess could not be started or the exit code from the subprocess.
339              
340             =head2 pty
341              
342             $run3->on(pty => sub ($run3, $bytes) { });
343              
344             Emitted when the subprocess write bytes to L. See L for more
345             details.
346              
347             =head2 stderr
348              
349             $run3->on(stderr => sub ($run3, $bytes) { });
350              
351             Emitted when the subprocess write bytes to STDERR.
352              
353             =head2 stdout
354              
355             $run3->on(stdout => sub ($run3, $bytes) { });
356              
357             Emitted when the subprocess write bytes to STDOUT.
358              
359             =head2 spawn
360              
361             $run3->on(spawn => sub ($run3, @) { });
362              
363             Emitted in the parent process after the subprocess has been forked.
364              
365             =head1 ATTRIBUTES
366              
367             =head2 driver
368              
369             $hash_ref = $run3->driver;
370             $run3 = $self->driver({stdin => 'pipe', stdout => 'pipe', stderr => 'pipe'});
371              
372             Used to set the driver for "pty", "stdin", "stdout" and "stderr".
373              
374             Examples:
375              
376             # Open pipe for STDIN and STDOUT and close STDERR in child process
377             $self->driver({stdin => 'pipe', stdout => 'pipe'});
378              
379             # Create a PTY and attach STDIN to it and open a pipe for STDOUT and STDERR
380             $self->driver({stdin => 'pty', stdout => 'pipe', stderr => 'pipe'});
381              
382             # Create a PTY and pipes for STDIN, STDOUT and STDERR
383             $self->driver({pty => 'pty', stdin => 'pipe', stdout => 'pipe', stderr => 'pipe'});
384              
385             # Create a PTY, but do not make the PTY slave the controlling terminal
386             $self->driver({pty => 'pty', stdout => 'pipe', make_slave_controlling_terminal => 0});
387              
388             # It is not supported to set "pty" to "pipe"
389             $self->driver({pty => 'pipe'});
390              
391             =head2 ioloop
392              
393             $ioloop = $run3->ioloop;
394             $run3 = $run3->ioloop(Mojo::IOLoop->singleton);
395              
396             Holds a L object.
397              
398             =head1 METHODS
399              
400             =head2 bytes_waiting
401              
402             $int = $run3->bytes_waiting;
403              
404             Returns how many bytes has been passed on to L buffer, but not yet
405             written to the child process.
406              
407             =head2 close
408              
409             $run3 = $run3->close('other');
410             $run3 = $run3->close('stdin');
411              
412             Can be used to close C or other filehandles that are not in use in a sub
413             process.
414              
415             Closing "stdin" is useful after piping data into a process like C.
416              
417             Here is an example of closing "other":
418              
419             $run3->start(sub ($run3, @) {
420             $run3->close('other');
421             exec telnet => '127.0.0.1';
422             });
423              
424             Closing "other" is currently EXPERIMENTAL and might be changed later on, but it
425             is unlikely it will get removed.
426              
427             =head2 exit_status
428              
429             $int = $run3->exit_status;
430              
431             Returns the exit status part of L, which will should be a number from
432             0 to 255.
433              
434             =head2 handle
435              
436             $fh = $run3->handle($name);
437              
438             Returns a file handle or undef for C<$name>, which can be "stdin", "stdout",
439             "stderr" or "pty". This method returns the write or read "end" of the file
440             handle depending if it is called from the parent or child process.
441              
442             =head2 kill
443              
444             $int = $run3->kill($signal);
445              
446             Used to send a C<$signal> to the subprocess. Returns C<-1> if no process
447             exists, C<0> if the process could not be signalled and C<1> if the signal was
448             successfully sent.
449              
450             =head2 pid
451              
452             $int = $run3->pid;
453              
454             Process ID of the child after L has successfully started. The PID will
455             be "0" in the child process and "-1" before the child process was started.
456              
457             =head2 run_p
458              
459             $p = $run3->run_p(sub ($run3) { ... })->then(sub ($run3) { ... });
460              
461             Will L the subprocess and the promise will be fulfilled when L
462             is emitted.
463              
464             =head2 start
465              
466             $run3 = $run3->start(sub ($run3, @) { ... });
467              
468             Will start the subprocess. The code block passed in will be run in the child
469             process. C can be used if you want to run another program. Example:
470              
471             $run3 = $run3->start(sub { exec @my_other_program_with_args });
472             $run3 = $run3->start(sub { exec qw(/usr/bin/ls -l /tmp) });
473              
474             =head2 status
475              
476             $int = $run3->status;
477              
478             Holds the exit status of the program or C<$!> if the program failed to start.
479             The value includes signals and coredump flags. L can be used
480             instead to get the exit value from 0 to 255.
481              
482             =head2 write
483              
484             $run3 = $run3->write($bytes);
485             $run3 = $run3->write($bytes, sub ($run3) { ... });
486             $run3 = $run3->write($bytes, $conduit);
487             $run3 = $run3->write($bytes, $conduit, sub ($run3) { ... });
488              
489             Used to write C<$bytes> to the subprocess. C<$conduit> can be "pty" or "stdin",
490             and defaults to "stdin". The optional callback will be called on the next
491             L event.
492              
493             =head1 AUTHOR
494              
495             Jan Henning Thorsen
496              
497             =head1 COPYRIGHT AND LICENSE
498              
499             This program is free software, you can redistribute it and/or modify it under
500             the terms of the Artistic License version 2.0.
501              
502             =head1 SEE ALSO
503              
504             L,
505             L, L, L.
506              
507             =cut