File Coverage

blib/lib/IPC/Simple.pm
Criterion Covered Total %
statement 154 176 87.5
branch 22 36 61.1
condition 5 12 41.6
subroutine 41 49 83.6
pod 12 18 66.6
total 234 291 80.4


line stmt bran cond sub pod time code
1             package IPC::Simple;
2             # ABSTRACT: simple, non-blocking IPC
3             $IPC::Simple::VERSION = '0.09';
4              
5 4     4   307937 use strict;
  4         46  
  4         118  
6 4     4   21 use warnings;
  4         7  
  4         105  
7              
8 4     4   21 use Carp;
  4         6  
  4         189  
9 4     4   33 use AnyEvent qw();
  4         7  
  4         88  
10 4     4   3020 use AnyEvent::Handle qw();
  4         82329  
  4         128  
11 4     4   2333 use IPC::Open3 qw(open3);
  4         16771  
  4         249  
12 4     4   2138 use POSIX qw(:sys_wait_h);
  4         26995  
  4         21  
13 4     4   6211 use Symbol qw(gensym);
  4         14  
  4         216  
14              
15 4     4   1927 use IPC::Simple::Channel qw();
  4         10  
  4         90  
16 4     4   1708 use IPC::Simple::Group qw();
  4         12  
  4         86  
17 4     4   1667 use IPC::Simple::Message;
  4         10  
  4         226  
18              
19 4     4   24 use constant STATE_READY => 0;
  4         7  
  4         229  
20 4     4   25 use constant STATE_RUNNING => 1;
  4         8  
  4         207  
21 4     4   25 use constant STATE_STOPPING => 2;
  4         20  
  4         211  
22              
23             BEGIN{
24 4     4   26 use base 'Exporter';
  4         6  
  4         395  
25 4     4   7858 our @EXPORT_OK = qw(
26             spawn
27             process_group
28             );
29             }
30              
31             #-------------------------------------------------------------------------------
32             # Convenience constructor
33             #-------------------------------------------------------------------------------
34             sub spawn ($;%) {
35 5     5 1 366 my ($cmd, @args) = @_;
36 5         42 return IPC::Simple->new(cmd => $cmd, @args);
37             }
38              
39             sub process_group {
40 1     1 1 12 return IPC::Simple::Group->new(@_);
41             }
42              
43             #-------------------------------------------------------------------------------
44             # Constructor
45             #-------------------------------------------------------------------------------
46             sub new {
47 5     5 0 20 my ($class, %param) = @_;
48 5 50       25 my $cmd = ref $param{cmd} ? $param{cmd} : [ $param{cmd} ];
49 5 50       20 my $eol = defined $param{eol} ? $param{eol} : "\n";
50 5   66     36 my $name = $param{name} || "@$cmd";
51 5         10 my $recv_cb = $param{recv_cb};
52 5         9 my $term_cb = $param{term_cb};
53              
54 5         61 bless{
55             name => $name,
56             cmd => $cmd,
57             eol => $eol,
58             recv_cb => $recv_cb,
59             term_cb => $term_cb,
60             run_state => STATE_READY,
61             pid => undef,
62             handle_in => undef,
63             handle_out => undef,
64             handle_err => undef,
65             exit_status => undef,
66             exit_code => undef,
67             messages => undef,
68             kill_timer => undef,
69             }, $class;
70             }
71              
72             #-------------------------------------------------------------------------------
73             # State accessor and predicates
74             #-------------------------------------------------------------------------------
75             sub run_state {
76 44     44 0 86 my $self = shift;
77              
78 44 100       120 if (@_) {
79 13         37 my $new_state = shift;
80 13         69 $self->debug('run state changed to %d', $new_state);
81 13         42 $self->{run_state} = $new_state;
82             }
83              
84 44         204 return $self->{run_state};
85             }
86              
87 11     11 0 44 sub is_ready { $_[0]->run_state == STATE_READY }
88 14     14 0 6080 sub is_running { $_[0]->run_state == STATE_RUNNING }
89 6     6 0 18 sub is_stopping { $_[0]->run_state == STATE_STOPPING }
90              
91             #-------------------------------------------------------------------------------
92             # Other accessors
93             #-------------------------------------------------------------------------------
94 8     8 1 29 sub name { $_[0]->{name} }
95 0     0 1 0 sub pid { $_[0]->{pid} }
96 0     0 1 0 sub exit_status { $_[0]->{exit_status} }
97 1     1 1 14 sub exit_code { $_[0]->{exit_code} }
98              
99             #-------------------------------------------------------------------------------
100             # Ensure the process is cleaned up when the object is garbage collected.
101             #-------------------------------------------------------------------------------
102             sub DESTROY {
103 0     0   0 my $self = shift;
104              
105             # Localize globals to avoid affecting global state during shutdown
106 0         0 local ($., $@, $!, $^E, $?);
107              
108 0 0 0     0 if ($self->{pid} && waitpid($self->{pid}, WNOHANG) == 0) {
109 0         0 kill 'KILL', $self->{pid};
110 0         0 waitpid $self->{pid}, 0;
111             }
112             }
113              
114             #-------------------------------------------------------------------------------
115             # Logs debug messages
116             #-------------------------------------------------------------------------------
117             sub debug {
118 43     43 0 89 my $self = shift;
119              
120 43 50       133 if ($ENV{IPC_SIMPLE_DEBUG}) {
121 0         0 my $msg = sprintf shift, @_;
122              
123 0         0 my ($pkg, $file, $line) = caller;
124 0   0     0 my $pid = $self->{pid} || '(ready)';
125 0         0 my $ts = time;
126              
127 0         0 warn "<$pkg:$line | $ts | pid:$pid> $msg\n";
128             }
129             }
130              
131             #-------------------------------------------------------------------------------
132             # Launch and helpers
133             #-------------------------------------------------------------------------------
134             sub launch {
135 5     5 1 11453 my $self = shift;
136              
137 5 50       19 if ($self->is_running) {
138 0         0 croak 'already running';
139             }
140              
141 5 50       21 if ($self->is_stopping) {
142 0         0 croak 'process is terminating';
143             }
144              
145 5         15 my $cmd = $self->{cmd};
146              
147 5         35 $self->debug('launching: %s', "@$cmd");
148              
149 5 50       31 my $pid = open3(my $in, my $out, my $err = gensym, @$cmd)
150             or croak $!;
151              
152 5         18574 $self->debug('process launched with pid %d', $pid);
153              
154 5         141 $self->run_state(STATE_RUNNING);
155              
156 5         24 $self->{exit_status} = undef;
157 5         38 $self->{exit_code} = undef;
158 5         34 $self->{kill_timer} = undef;
159 5         33 $self->{pid} = $pid;
160 5         83 $self->{handle_err} = $self->_build_input_handle($err, IPC_STDERR);
161 5         97 $self->{handle_out} = $self->_build_input_handle($out, IPC_STDOUT);
162 5         124 $self->{handle_in} = $self->_build_output_handle($in);
163 5         159 $self->{messages} = IPC::Simple::Channel->new;
164              
165 5         116 return 1;
166             }
167              
168             sub _build_output_handle {
169 5     5   20 my ($self, $fh) = @_;
170              
171             # set non-blocking
172 5         27 AnyEvent::fh_unblock($fh);
173              
174             my $handle = AnyEvent::Handle->new(
175             fh => $fh,
176 0     0   0 on_error => sub{ $self->_on_error(IPC_STDIN, @_) },
177 5         121 );
178              
179 5         410 return $handle;
180             }
181              
182             sub _build_input_handle {
183 10     10   82 my ($self, $fh, $type) = @_;
184              
185             # set non-blocking
186 10         134 AnyEvent::fh_unblock($fh);
187              
188             my $handle = AnyEvent::Handle->new(
189             fh => $fh,
190 0     0   0 on_eof => sub{ $self->terminate },
191 2     2   260 on_error => sub{ $self->_on_error($type, @_) },
192 0     0   0 on_read => sub{ $self->_on_read($type, @_) },
193 10         557 );
194              
195             # push an initial read to prime the queue
196 10         9598 $self->_push_read($handle, $type);
197              
198 10         1138 return $handle;
199             }
200              
201             sub _on_error {
202 2     2   6 my ($self, $type, $handle, $fatal, $msg) = @_;
203 2         10 $self->_queue_message(IPC_ERROR, $msg);
204              
205 2 50       6 if ($fatal) {
206 2         12 $self->terminate;
207             }
208             }
209              
210             sub _on_exit {
211 3     3   10 my ($self, $status) = @_;
212 3         24 undef $self->{kill_timer};
213 3         28 $self->run_state(STATE_READY);
214 3   100     20 $self->{exit_status} = $status || 0;
215 3         11 $self->{exit_code} = $self->{exit_status} >> 8;
216              
217             $self->debug('child (pid %s) exited with status %d (exit code: %d)',
218             $self->{pid} || '(no pid)',
219             $self->{exit_status},
220             $self->{exit_code},
221 3   50     19 );
222              
223             # May not be set yet if launch fails early enough
224 3 50       39 if ($self->{messages}) {
225 3         19 $self->{messages}->shutdown;
226             }
227             }
228              
229             sub _on_read {
230 0     0   0 my ($self, $type, $handle) = @_;
231 0         0 $self->debug('read event type=%s', $type);
232 0         0 $self->_push_read($handle, $type);
233             }
234              
235             sub _push_read {
236 10     10   43 my ($self, $handle, $type) = @_;
237             $handle->push_read(line => $self->{eol}, sub{
238 4     4   704 my ($handle, $line) = @_;
239 4         10 chomp $line;
240 4         16 $self->_queue_message($type, $line);
241 10         191 });
242             }
243              
244             sub _queue_message {
245 6     6   24 my ($self, $type, $msg) = @_;
246 6         20 $self->debug('buffered type=%s, msg="%s"', $type, $msg);
247              
248 6         56 my $message = IPC::Simple::Message->new(
249             source => $self,
250             type => $type,
251             message => $msg,
252             );
253              
254 6 100       25 if ($self->{recv_cb}) {
255 4         21 $self->{recv_cb}->($message);
256             } else {
257 2         9 $self->{messages}->put($message);
258             }
259             }
260              
261             #-------------------------------------------------------------------------------
262             # Send a signal to the process
263             #-------------------------------------------------------------------------------
264             sub signal {
265 5     5 1 18 my ($self, $signal) = @_;
266 5 50       24 if ($self->{pid}) {
267 5         20 $self->debug('sending %s to pid %d', $signal, $self->{pid});
268 5         342 kill $signal, $self->{pid};
269             }
270             }
271              
272             #-------------------------------------------------------------------------------
273             # Stopping the process and waiting on it to complete
274             #-------------------------------------------------------------------------------
275             sub terminate {
276 7     7 1 42 my $self = shift;
277 7         30 my $timeout = shift;
278              
279 7 100       20 if ($self->is_running) {
280 5         32 $self->signal('TERM');
281 5         56 $self->run_state(STATE_STOPPING);
282              
283 5         34 $self->{handle_in}->push_shutdown;
284 5         228 $self->{handle_out}->push_shutdown;
285 5         121 $self->{handle_err}->push_shutdown;
286              
287 5 100       188 if (defined $timeout) {
288             $self->{kill_timer} = AnyEvent->timer(
289             after => $timeout,
290             cb => sub{
291 0     0   0 $self->signal('KILL');
292 0         0 undef $self->{kill_timer};
293             },
294 3         107 );
295             }
296              
297 5 100       143 if ($self->{term_cb}) {
298 2         11 $self->{term_cb}->($self);
299             }
300             }
301             }
302              
303             sub join {
304 3     3 1 9 my $self = shift;
305              
306 3 50       12 return if $self->is_ready;
307              
308 3         22 $self->debug('waiting for process to exit, pid %d', $self->{pid});
309              
310 3         65 my $done = AnyEvent->condvar;
311              
312 3         644 my $timer; $timer = AnyEvent->timer(
313             after => 0,
314             interval => 0.01,
315             cb => sub{
316             # non-blocking waitpid returns 0 if the pid is still alive
317 3 50   3   355 if (waitpid($self->{pid}, WNOHANG) != 0) {
318 3         16 my $status = $?;
319              
320             # another waiter might have already called _on_exit
321 3 50       10 unless ($self->is_ready) {
322 3         16 $self->_on_exit($?);
323             }
324              
325 3         32 $done->send;
326             }
327             },
328 3         58 );
329              
330 3         139 $done->recv;
331             }
332              
333             #-------------------------------------------------------------------------------
334             # Messages
335             #-------------------------------------------------------------------------------
336             sub send {
337 1     1 1 8 my ($self, $msg) = @_;
338 1         5 $self->debug('sending "%s"', $msg);
339 1         13 $self->{handle_in}->push_write($msg . $self->{eol});
340 1         121 1;
341             }
342              
343             sub recv {
344 2     2 1 61 my ($self, $type) = @_;
345 2         7 $self->debug('waiting on message from pid %d', $self->{pid});
346 2         14 $self->{messages}->get;
347             }
348              
349             1;
350              
351             __END__