File Coverage

blib/lib/IPC/Open3/Callback.pm
Criterion Covered Total %
statement 137 167 82.0
branch 35 52 67.3
condition 15 31 48.3
subroutine 23 29 79.3
pod 4 5 80.0
total 214 284 75.3


line stmt bran cond sub pod time code
1 2     2   45497 use strict;
  2         4  
  2         46  
2 2     2   9 use warnings;
  2         4  
  2         70  
3              
4             package IPC::Open3::Callback;
5             $IPC::Open3::Callback::VERSION = '1.19';
6             # ABSTRACT: An extension to IPC::Open3 that will feed out and err to callbacks instead of requiring the caller to handle them.
7             # PODNAME: IPC::Open3::Callback
8              
9 2     2   369 use Data::Dumper;
  2         4887  
  2         119  
10 2     2   11 use Exporter qw(import);
  2         4  
  2         45  
11 2     2   301 use Hash::Util qw(lock_keys);
  2         1956  
  2         10  
12 2     2   595 use IO::Select;
  2         2366  
  2         85  
13 2     2   581 use IO::Socket;
  2         32790  
  2         9  
14 2     2   1333 use IPC::Open3;
  2         4912  
  2         118  
15 2     2   34 use Symbol qw(gensym);
  2         5  
  2         71  
16              
17 2     2   551 use parent qw(Class::Accessor);
  2         557  
  2         13  
18             __PACKAGE__->follow_best_practice;
19             __PACKAGE__->mk_accessors(
20             qw(out_callback err_callback buffer_output select_timeout buffer_size input_buffer));
21             __PACKAGE__->mk_ro_accessors(qw(pid last_command last_exit_code));
22              
23             our @EXPORT_OK = qw(safe_open3);
24              
25             my $logger;
26             eval {
27             require Log::Log4perl;
28             $logger = Log::Log4perl->get_logger('IPC::Open3::Callback');
29             };
30             if ($@) {
31             require IPC::Open3::Callback::Logger;
32             $logger = IPC::Open3::Callback::Logger->get_logger();
33             }
34              
35             sub new {
36 7     7 1 9391 my ( $class, @args ) = @_;
37 7         34 return bless( {}, $class )->_init(@args);
38             }
39              
40             sub _append_to_buffer {
41 4     4   11 my ( $self, $buffer_ref, $data, $flush ) = @_;
42              
43 4         16 my @lines = split( /\n/, $$buffer_ref . $data, -1 );
44              
45             # save the last line in the buffer as it may not yet be a complete line
46 4 100       14 $$buffer_ref = $flush ? '' : pop(@lines);
47              
48             # return all complete lines
49 4         16 return @lines;
50             }
51              
52             sub _clear_input_buffer {
53 0     0   0 my ($self) = shift;
54 0         0 delete( $self->{input_buffer} );
55             }
56              
57             sub DESTROY {
58 7     7   15591 my ($self) = shift;
59 7         20 $self->_destroy_child();
60             }
61              
62             sub _destroy_child {
63 17     17   33 my $self = shift;
64              
65 17         60 my $pid = $self->get_pid();
66 17 100       204 if ($pid) {
67 10         147 waitpid( $pid, 0 );
68 10         100 $self->_set_last_exit_code( $? >> 8 );
69              
70             $logger->debug(
71             sub {
72 0     0   0 "Exited '",
73             $self->get_last_command(),
74             "' with code ",
75             $self->get_last_exit_code();
76             }
77 10         156 );
78 10         139 $self->_set_pid();
79             }
80              
81 17         482 return $self->{last_exit_code};
82             }
83              
84             sub _init {
85 7     7   16 my ( $self, $args_ref ) = @_;
86              
87 7         31 $self->{buffer_output} = undef;
88 7         12 $self->{buffer_size} = undef;
89 7         12 $self->{err_callback} = undef;
90 7         17 $self->{input_buffer} = undef;
91 7         14 $self->{last_command} = undef;
92 7         11 $self->{last_exit_code} = undef;
93 7         11 $self->{out_callback} = undef;
94 7         18 $self->{pid} = undef;
95 7         11 $self->{select_timeout} = undef;
96 7         10 lock_keys( %{$self} );
  7         39  
97              
98 7 100       90 if ( defined($args_ref) ) {
99 1 50       4 $logger->logdie('parameters must be an hash reference')
100             unless ( ( ref($args_ref) ) eq 'HASH' );
101 1         2 $self->{out_callback} = $args_ref->{out_callback};
102 1         2 $self->{err_callback} = $args_ref->{err_callback};
103 1         8 $self->{buffer_output} = $args_ref->{buffer_output};
104 1   50     5 $self->{select_timeout} = $args_ref->{select_timeout} || 3;
105 1   50     4 $self->{buffer_size} = $args_ref->{buffer_size} || 1024;
106             }
107             else {
108 6         13 $self->{select_timeout} = 3;
109 6         9 $self->{buffer_size} = 1024;
110             }
111              
112 7         31 return $self;
113             }
114              
115             sub _nix_open3 {
116 12     12   35 my ( $in_read, $out_write, $err_write, @command ) = @_;
117 12         23 my ( $in_write, $out_read, $err_read );
118              
119 12 100       26 if ( !$in_read ) {
120 11         65 $in_read = gensym();
121 11         198 $in_write = $in_read;
122             }
123 12 100       26 if ( !$out_write ) {
124 11         27 $out_read = gensym();
125 11         111 $out_write = $out_read;
126             }
127 12 50       26 if ( !$err_write ) {
128 12         26 $err_read = gensym();
129 12         111 $err_write = $err_read;
130             }
131              
132 12         47 return ( open3( $in_read, $out_write, $err_write, @command ),
133             $in_write, $out_read, $err_read );
134             }
135              
136             sub run_command {
137 11     11 1 2905 my ( $self, @command ) = @_;
138              
139             # if last arg is hashref, its command options not arg...
140 11         23 my $options = {};
141 11 100       42 if ( ref( $command[-1] ) eq 'HASH' ) {
142 10         25 $options = pop(@command);
143             }
144              
145 11         20 my ($out_callback, $out_buffer_ref, $err_callback,
146             $err_buffer_ref, $buffer_size, $select_timeout
147             );
148 11   100     53 $out_callback = $options->{out_callback} || $self->get_out_callback();
149 11   66     109 $err_callback = $options->{err_callback} || $self->get_err_callback();
150 11 100 66     197 if ( $options->{buffer_output} || $self->get_buffer_output() ) {
151 2         10 my $out_temp = '';
152 2         3 my $err_temp = '';
153 2         6 $out_buffer_ref = \$out_temp;
154 2         4 $err_buffer_ref = \$err_temp;
155             }
156 11   33     130 $buffer_size = $options->{buffer_size} || $self->get_buffer_size();
157 11   33     147 $select_timeout = $options->{select_timeout} || $self->get_select_timeout();
158              
159 11         135 $self->_set_last_command( \@command );
160 11         37 $logger->debug( "Running '", $self->get_last_command(), "'" );
161             my ( $pid, $in_fh, $out_fh, $err_fh ) = safe_open3_with(
162             $options->{in_handle},
163             $options->{out_handle},
164             $options->{err_handle}, @command
165 11         246 );
166 10         31032 $self->_set_pid($pid);
167              
168 10         139 my $select = IO::Select->new();
169 10         175 $select->add( $out_fh, $err_fh );
170 10         792 while ( my @ready = $select->can_read($select_timeout) ) {
171 19 50       1913 if ( $self->get_input_buffer() ) {
172 0         0 syswrite( $in_fh, $self->get_input_buffer() );
173 0         0 $self->_clear_input_buffer();
174             }
175 19         352 foreach my $fh (@ready) {
176 28         57 my $line;
177 28         159 my $bytes_read = sysread( $fh, $line, $buffer_size );
178 28 50 33     314 if ( !defined($bytes_read) && !$!{ECONNRESET} ) {
    100 66        
179 0     0   0 $logger->error( "sysread failed: ", sub { Dumper(%!) } );
  0         0  
180 0         0 $logger->logdie( "error in running '", $self->get_last_command(), "': ", $! );
181             }
182             elsif ( !defined($bytes_read) || $bytes_read == 0 ) {
183 19         79 $select->remove($fh);
184 19         811 next;
185             }
186             else {
187 9 50 33     52 if ( $out_fh && $fh == $out_fh ) {
    0 0        
188 9         48 $self->_write_to_callback( $out_callback, $line, $out_buffer_ref, 0 );
189             }
190             elsif ( $err_fh && $fh == $err_fh ) {
191 0         0 $self->_write_to_callback( $err_callback, $line, $err_buffer_ref, 0 );
192             }
193             else {
194 0         0 $logger->logdie('Impossible... somehow got a filehandle I dont know about!');
195             }
196             }
197             }
198             }
199              
200             # flush buffers
201 10         114 $self->_write_to_callback( $out_callback, '', $out_buffer_ref, 1 );
202 10         31 $self->_write_to_callback( $err_callback, '', $err_buffer_ref, 1 );
203              
204 10         31 return $self->_destroy_child();
205             }
206              
207             sub safe_open3 {
208 1     1 1 617 return safe_open3_with( undef, undef, undef, @_ );
209             }
210              
211             sub safe_open3_with {
212 12     12 1 68 my ( $in_handle, $out_handle, $err_handle, @command ) = @_;
213              
214 12 100       66 my @args = (
    100          
    50          
215             $in_handle ? '<&' . fileno($in_handle) : undef,
216             $out_handle ? '>&' . fileno($out_handle) : undef,
217             $err_handle ? '>&' . fileno($err_handle) : undef, @command
218             );
219 12 50       78 return ( $^O =~ /MSWin32/ ) ? _win_open3(@args) : _nix_open3(@args);
220             }
221              
222             sub send_input {
223 0     0 0 0 my ($self) = @_;
224 0         0 $self->set_input_buffer(shift);
225             }
226              
227             sub _set_last_command {
228 11     11   36 my ( $self, $command_ref ) = @_;
229              
230 11 50       32 $logger->logdie('the command parameter must be an array reference')
231             unless ( ( ref($command_ref) ) eq 'ARRAY' );
232              
233 11         18 $self->{last_command} = join( ' ', @{$command_ref} );
  11         51  
234             }
235              
236             sub _set_last_exit_code {
237 10     10   23 my ( $self, $code ) = @_;
238 10         30 $self->{last_exit_code} = $code;
239             }
240              
241             sub _set_pid {
242 20     20   56 my ( $self, $pid ) = @_;
243              
244 20 100       143 if ( !defined($pid) ) {
    50          
245 10         31 delete( $self->{pid} );
246             }
247             elsif ( $pid !~ /^\d+$/ ) {
248 0         0 $logger->logdie('the parameter must be an integer');
249             }
250             else {
251 10         40 $self->{pid} = $pid;
252             }
253             }
254              
255             sub _win_open3 {
256 0     0   0 my ( $in_read, $out_write, $err_write, @command ) = @_;
257              
258 0         0 my ($in_pipe_read, $in_pipe_write, $out_pipe_read,
259             $out_pipe_write, $err_pipe_read, $err_pipe_write
260             );
261 0 0       0 if ( !$in_read ) {
262 0         0 ( $in_pipe_read, $in_pipe_write ) = _win_pipe();
263 0         0 $in_read = '>&' . fileno($in_pipe_read);
264             }
265 0 0       0 if ( !$out_write ) {
266 0         0 ( $out_pipe_read, $out_pipe_write ) = _win_pipe();
267 0         0 $out_write = '<&' . fileno($out_pipe_write);
268             }
269 0 0       0 if ( !$err_write ) {
270 0         0 ( $err_pipe_read, $err_pipe_write ) = _win_pipe();
271 0         0 $err_write = '<&' . fileno($err_pipe_write);
272             }
273              
274 0         0 my $pid = open3( $in_read, $out_write, $err_write, @command );
275              
276 0         0 return ( $pid, $in_pipe_write, $out_pipe_read, $err_pipe_read );
277             }
278              
279             sub _win_pipe {
280 0     0   0 my ( $read, $write ) = IO::Socket->socketpair( AF_UNIX, SOCK_STREAM, PF_UNSPEC );
281 0         0 $read->shutdown(SHUT_WR); # No more writing for reader
282 0         0 $write->shutdown(SHUT_RD); # No more reading for writer
283              
284 0         0 return ( $read, $write );
285             }
286              
287             sub _write_to_callback {
288 29     29   90 my ( $self, $callback, $data, $buffer_ref, $flush ) = @_;
289              
290 29 100       80 return if ( !defined($callback) );
291              
292 20         63 my $pid = $self->get_pid();
293 20 100       313 if ( !defined($buffer_ref) ) {
294 16         30 &{$callback}( $data, $pid );
  16         83  
295 16         94 return;
296             }
297              
298 4         14 foreach my $line ( $self->_append_to_buffer( $buffer_ref, $data, $flush ) ) {
299 6         169 &{$callback}( $line, $pid );
  6         14  
300             }
301             }
302              
303             1;
304              
305             __END__