File Coverage

blib/lib/Parallel/Jobs.pm
Criterion Covered Total %
statement 215 258 83.3
branch 98 154 63.6
condition 35 48 72.9
subroutine 19 20 95.0
pod 0 5 0.0
total 367 485 75.6


line stmt bran cond sub pod time code
1             package Parallel::Jobs;
2              
3             # Start a number of jobs in parallel. Caller is responsible for
4             # deciding how many to start at a time. Allow fine-grained control
5             # over stdin, stdout and stderr of every job.
6              
7 1     1   20840 use 5.006;
  1         4  
  1         50  
8 1     1   6 use strict;
  1         374  
  1         305  
9 1     1   7 use warnings;
  1         17  
  1         36  
10 1     1   6 use Exporter;
  1         1  
  1         54  
11 1     1   6 use Carp;
  1         1  
  1         75  
12 1     1   1223 use IPC::Open3;
  1         24863  
  1         93  
13 1     1   13 use File::Spec;
  1         2  
  1         28  
14 1     1   1235 use POSIX qw(:sys_wait_h);
  1         9597  
  1         8  
15 1     1   4580 use File::Temp qw(tempfile);
  1         36767  
  1         80  
16              
17 1         396 use vars qw(@ISA @EXPORT @EXPORT_OK $VERSION %pids %dead_pids
18 1     1   10 %stdout_handles %stderr_handles $debug @events);
  1         3  
19              
20             @ISA = qw(Exporter);
21             @EXPORT = qw();
22             @EXPORT_OK = qw(start_job watch_jobs);
23             $VERSION = 0.09;
24              
25             sub new_handle ();
26             sub is_our_handle ($);
27              
28             # If the first argument is a hashref, it contains zero or more of
29             # these parameters:
30             #
31             # stdin_file - give the job input from the specified file
32             # stdin_handle - give the job input from the specified file handle
33             # stdout_handle - send the job's stdout to the specified file handle
34             # stdout_capture - capture the stdout while monitoring running jobs
35             # stderr_handle - send the job's stderr to the specified file handle
36             # stderr_capture - capture the stderr while monitoring running jobs
37             #
38             # By default, the job gets input from /dev/null and stdout and stderr
39             # are unmodified.
40             #
41             # Remaining arguments, or all arguments if the first argument isn't a
42             # hashref, are the command to execute.
43             #
44             # Returns the PID of the started job, or undef on error.
45              
46             my($fh_counter) = 0;
47            
48             sub start_job {
49 27     27 0 48 my(%params);
50             my(@cmd);
51 0         0 my($stdin_handle, $stdout_handle, $stderr_handle);
52 0         0 my($capture_stdout, $capture_stderr);
53 0         0 my($pid);
54 0         0 my(@to_close);
55              
56 27 50       133 if ($_[0] eq __PACKAGE__) {
57             # Should we flame at the user for calling this function as a
58             # method instead of a function call? Not sure.
59 0         0 shift;
60             }
61              
62 27 50       310 if (! @_) {
    50          
    0          
63 0         0 carp(__PACKAGE__ . ": No arguments specified");
64 0         0 return undef;
65             }
66             elsif (ref $_[0] eq 'HASH') {
67 27         45 %params = %{shift @_};
  27         274  
68 27         115 @cmd = @_;
69             }
70             elsif (ref $_[0] eq 'SCALAR') {
71 0         0 @cmd = (${shift @_});
  0         0  
72 0         0 push(@cmd, @_);
73             }
74             else {
75 0         0 @cmd = @_;
76             }
77              
78 27 100       92 if ($params{'stdin_handle'}) {
79 1         13 $stdin_handle = new_handle;
80 1     1   9 no strict 'refs';
  1         2  
  1         145  
81 1 50       67 open($stdin_handle, '<&' . fileno($params{'stdin_handle'}))
82             or croak "Couldn't reopen fhandle $params{'stdin_handle'}: $!";
83 1         7 delete $params{'stdin_handle'};
84             }
85             else {
86 26 100       192 $params{'stdin_file'} = '/dev/null'
87             unless $params{'stdin_file'};
88             }
89              
90 27 100       118 if ($params{'stdin_file'}) {
91 26 50       61 if ($stdin_handle) {
92 0         0 carp(__PACKAGE__ . ": Multiple stdin sources specified");
93 0         0 return undef;
94             }
95 26         151 $stdin_handle = new_handle;
96 1     1   6 no strict 'refs';
  1         2  
  1         85  
97 26 50       3157 if (! open($stdin_handle, $params{'stdin_file'})) {
98 0         0 carp(__PACKAGE__ . ": open($params{'stdin_file'}): $!");
99 0         0 return undef;
100             }
101 26         101 delete $params{'stdin_file'};
102             }
103              
104 27         176 $stdin_handle = '<&' . $stdin_handle;
105              
106 27 100       94 if ($params{'stdout_handle'}) {
107 5         94 $stdout_handle = new_handle;
108 1     1   4 no strict 'refs';
  1         2  
  1         183  
109 5 50       166 open($stdout_handle, '>&' . fileno($params{'stdout_handle'}))
110             or croak "Couldn't reopen fhandle $params{'stdout_handle'}: $!";
111 5 50       16 warn("Stdout file descriptor (from handle) is " .
112             fileno($stdout_handle) . "\n") if ($debug);
113 5         9 push(@to_close, $stdout_handle);
114 5         15 $stdout_handle = '>&' . $stdout_handle;
115 5         19 delete $params{'stdout_handle'};
116             }
117              
118 27 100       98 if ($params{'stdout_capture'}) {
    100          
119 10 50       133 if ($stdout_handle) {
120 0         0 carp(__PACKAGE__ . ": Multiple stdout sources specified");
121 0         0 return undef;
122             }
123 10         23 delete $params{'stdout_capture'};
124 10         41 $stdout_handle = new_handle;
125 10         25 $capture_stdout = 1;
126             }
127             elsif (! $stdout_handle) {
128 12         25 $stdout_handle = ">&STDOUT";
129             }
130              
131 27 50       128 if ($params{'stderr_handle'}) {
132 0         0 $stderr_handle = new_handle;
133 1     1   27 no strict 'refs';
  1         8  
  1         7290  
134 0 0       0 open($stderr_handle, '>&' . fileno($params{'stderr_handle'}))
135             or croak "Couldn't reopen fhandle $params{'stderr_handle'}: $!";
136 0         0 push(@to_close, $stderr_handle);
137 0         0 $stderr_handle = '>&' . $stderr_handle;
138 0         0 delete $params{'stderr_handle'};
139             }
140              
141 27 100       105 if ($params{'stderr_capture'}) {
    50          
142 10 50       50 if ($stderr_handle) {
143 0         0 carp(__PACKAGE__ . ": Multiple stderr sources specified");
144 0         0 return undef;
145             }
146 10         25 delete $params{'stderr_capture'};
147 10         132 $stderr_handle = new_handle;
148 10         15 $capture_stderr = 1;
149             }
150             elsif (! $stderr_handle) {
151 17         35 $stderr_handle = ">&STDERR";
152             }
153              
154 27         60 $pid = eval {
155 27 0       73 warn("Calling open3(" .
    0          
    0          
    50          
156             ($stdin_handle ? $stdin_handle : "undef") . ", " .
157             ($stdout_handle ? $stdout_handle : "undef") . ", " .
158             ($stderr_handle ? $stderr_handle : "undef") . ", " .
159             "@cmd)\n") if ($debug);
160 27         131 open3($stdin_handle, $stdout_handle, $stderr_handle, @cmd);
161             };
162              
163 27 50       234760 if ($@) {
164 0         0 carp(__PACKAGE__ . ": $@");
165 0         0 return undef;
166             }
167              
168             # If we skipped the exec in open3, we're still in the Perl child.
169 27 50       186 return $pid unless $pid;
170              
171 27         401 $pids{$pid}++;
172 27 100       125 if ($capture_stdout) {
173 10         75 $stdout_handles{$pid} = $stdout_handle;
174 10 50       69 warn("Stdout fd of $pid is " . fileno($stdout_handles{$pid}) .
175             "\n") if ($debug);
176             }
177 27 100       122 if ($capture_stderr) {
178 10         125 $stderr_handles{$pid} = $stderr_handle;
179 10 50       40 warn("Stderr fd of $pid is " . fileno($stderr_handles{$pid}) .
180             "\n") if ($debug);
181             }
182              
183 27         102 map(close($_), @to_close);
184              
185 27         634 return $pid;
186             }
187              
188             my($fhnum) = 0;
189             my($fh_pat) = sprintf('^%s::fh\d+$', __PACKAGE__);
190              
191             sub new_handle () {
192 52     52 0 570 sprintf("%s::fh%d", __PACKAGE__, $fhnum++);
193             }
194              
195             sub is_our_handle ($) {
196 0     0 0 0 $_[0] =~ /$fh_pat/o;
197             }
198              
199             sub watch_jobs {
200 60     60 0 99 my(%args);
201             my(@pid_only);
202 0         0 my(%stdout_to_pid, %stderr_to_pid);
203 0         0 my($nfound, $waiting);
204 0         0 my($pid, $type);
205 0         0 my($ret);
206 0         0 my($rbits, $ebits);
207 60         211 my(@pids) = keys %pids;
208 60         123 my(@dead_pids) = keys %dead_pids;
209              
210 60 50       165 if (ref $_[0] eq 'HASH') {
211 0         0 %args = %{$_[0]};
  0         0  
212             }
213              
214 60 50       146 warn("watch_jobs: pids(@pids) dead_pids(@dead_pids) #events=",
215             scalar(@events), "\n") if ($debug);
216 60 50 66     243 if (! (@pids || @dead_pids || @events)) {
      66        
217 5         28 return ();
218             }
219              
220 55         134 foreach my $pid (@pids, @dead_pids) {
221 115 100 100     614 if (! ($stdout_handles{$pid} || $stderr_handles{$pid})) {
222 41         115 push(@pid_only, $pid);
223             }
224             else {
225 74 100       554 if ($stdout_handles{$pid}) {
226 53         243 $stdout_to_pid{$stdout_handles{$pid}} = $pid;
227             }
228 74 100       182 if ($stderr_handles{$pid}) {
229 52         187 $stderr_to_pid{$stderr_handles{$pid}} = $pid;
230             }
231             }
232             }
233              
234 55 50 33     335 if (! (@dead_pids || $args{nohang})) {
235 55 100       2167 if (@pid_only == @pids) {
236 18 100       132 if (@events) {
237 3         5 return(@{shift @events});
  3         23  
238             }
239              
240 15 50       102 warn "Waiting for @pids to exit\n" if ($debug);
241 15         5331292 $pid = wait;
242 15 50       87 if ($pid == -1) {
243             # This shouldn't happen. Perhaps the caller waited on
244             # processes himself. Shame.
245 0         0 return ();
246             }
247 15 50       46 warn "$pid exited (wait)\n" if ($debug);
248 15         70 delete $pids{$pid};
249 15         316 return($pid, 'EXIT', $?);
250             }
251             }
252              
253 37         67 foreach my $pid (@pids) {
254 77 100       636 if (waitpid($pid, WNOHANG) > 0) {
255 12 50       41 warn "$pid exited (waitpid)\n" if ($debug);
256 12         32 delete $pids{$pid};
257 12 100 100     485 if ($stdout_handles{$pid} || $stderr_handles{$pid}) {
258 9         18 $dead_pids{$pid}++;
259             }
260 12         101 push(@events, [$pid, 'EXIT', $?]);
261             }
262             }
263              
264 37         61 while (1) {
265 65         97 $rbits = '';
266 65         643 map((vec($rbits, fileno($_), 1) = 1),
267             keys %stdout_to_pid,
268             keys %stderr_to_pid);
269              
270 65 50       158 warn "Calling select() to wait for output from jobs\n" if ($debug);
271              
272 65 100 66     6015190 $nfound = select($rbits, undef, $ebits = $rbits,
273             (@events || $args{nohang}) ? 0 : undef);
274              
275 65 100       172 if (! $nfound) {
276 37 50 33     193 if (! (@events || $args{nohang})) {
277 0 0       0 die(__PACKAGE__ . ": Internal error: Select unexpectedly " .
278             "returned no pending data") if (! $nfound);
279             }
280 37         54 last;
281             }
282              
283 28   66     6060 ($waiting) = grep(vec($rbits, fileno($_), 1) || vec($ebits, fileno($_), 1),
284             keys %stdout_to_pid, keys %stderr_to_pid);
285              
286 28 50       90 die(__PACKAGE__ . ": Internal error: Select returned " .
287             "an unexpected bit") if (! $waiting);
288              
289 28 100       106 if (($pid = $stdout_to_pid{$waiting})) {
    50          
290 16         37 $type = 'STDOUT';
291             }
292             elsif (($pid = $stderr_to_pid{$waiting})) {
293 12         34 $type = 'STDERR';
294             }
295             else {
296 0         0 die(__PACKAGE__ . ": Internal error: unexpected file handle");
297             }
298              
299             # Read all pending data
300 28         51 my($nfound2);
301 28         41 my($size) = 1024;
302 28         55 my($data) = '';
303 28   100     36 do {
304 34         47 $rbits = '';
305 34         152 vec($rbits, fileno($waiting), 1) = 1;
306 34         278 $ret = sysread($waiting, $data, $size, length($data));
307 34         60 $size *= 2;
308 34         292 $nfound2 = select($rbits, undef, $ebits = $rbits, 0);
309             } while ($ret && $nfound2);
310              
311 28 100       80 if ($data ne '') {
312 8 50       23 warn "Got $type output ($data) from $pid\n" if ($debug);
313             }
314             else {
315             # Got EOF
316 20 50       52 warn "Got $type EOF from $pid\n" if ($debug);
317 20         277 close($waiting);
318 20 100       57 if ($type eq 'STDOUT') {
319 10         41 delete $stdout_to_pid{$stdout_handles{$pid}};
320 10         25 delete $stdout_handles{$pid};
321             }
322 20 100       54 if ($type eq 'STDERR') {
323 10         36 delete $stderr_to_pid{$stderr_handles{$pid}};
324 10         110 delete $stderr_handles{$pid};
325             }
326 20 100 66     343 if ($dead_pids{$pid} &&
      100        
327             ! ($stderr_handles{$pid} || $stdout_handles{$pid})) {
328 9         16 delete $dead_pids{$pid};
329             }
330             }
331              
332 28         162 push(@events, [$pid, $type, $data]);
333             }
334              
335 37 50       73 if (@events) {
336 37         40 return(@{shift @events});
  37         334  
337             }
338             else {
339 0         0 return ();
340             }
341             }
342              
343             # If the argument is true, operate silently.
344              
345             sub test ($) {
346 1     1 0 89 my($fh, $outfile) = tempfile(__PACKAGE__ . ".testXXXX");
347 1         418 my(@tests);
348              
349 1   33     8 $debug = ! (@_ && $_[0]);
350              
351 1         7 my(@commands) = (
352             [ 'true', ],
353             [ 'sleep', '1' ],
354             [ 'echo foo' ],
355             [ 'echo bar 1>&2' ],
356             ['sleep 1; echo baz; sleep 1; echo screlt' ],
357             );
358 1         9 @tests = ({}, {stdout_capture=>1}, {stderr_capture=>1},
359             {stdout_capture=>1,stderr_capture=>1});
360 1 50       4 if ($outfile) {
361 1         4 push(@tests, {stdout_handle=>$fh});
362             }
363              
364 1 50       3 if (! $debug) {
365 1     1   158 no warnings 'once';
  1         2  
  1         971  
366 1         12 my($devnull) = File::Spec->devnull;
367 1 50       37 open(DEVNULL, ">$devnull") or die "open($devnull): $!";
368 1         18 open(OLD_STDOUT, ">&STDOUT");
369 1         15 open(OLD_STDERR, ">&STDERR");
370             }
371              
372 1         4 foreach my $params (@tests) {
373 5         11 my(%params) = %{$params};
  5         42  
374 5         12 my($pid, $event, $data);
375              
376 5 50       15 if ($debug) {
377 0         0 print "Params:";
378 0         0 map((print " $_ => $params{$_}"), keys %params);
379 0         0 print "\n";
380             }
381             else {
382 5 100 100     83 if (! ($params{'stdout_handle'} || $params{'stdout_capture'})) {
383 2         57 open(STDOUT, ">&DEVNULL");
384             }
385 5 100 66     50 if (! ($params{'stderr_handle'} || $params{'stderr_capture'})) {
386 3         94 open(STDERR, ">&DEVNULL");
387             }
388             }
389              
390 5         16 foreach my $command (@commands) {
391 25         52 $pid = start_job($params, @{$command});
  25         194  
392 25 50       270 die if (! $pid);
393 25 50       236 print "Started @{$command} ($pid)\n" if ($debug);
  0         0  
394             }
395            
396 5         46 while (($pid, $event, $data) = watch_jobs()) {
397 53 50       255 print "pid=$pid, event=$event, data=$data\n" if ($debug);
398             }
399              
400 5 50       951 if (! $debug) {
401 5 100 100     58 if (! ($params{'stdout_handle'} || $params{'stdout_capture'})) {
402 2         93 open(STDOUT, ">&OLD_STDOUT");
403             }
404              
405 5 100 66     54 if (! ($params{'stderr_handle'} || $params{'stderr_capture'})) {
406 3         131 open(STDERR, ">&OLD_STDERR");
407             }
408             }
409             }
410              
411 1 50       20 if ($outfile) {
412 1         6 my($text);
413              
414 1         21 close($fh);
415              
416             {
417 1         3 local($/) = undef;
  1         13  
418 1 50       67 open(OUTFILE, $outfile) || die "open($outfile): $!";
419 1         39 $text = ;
420 1         13 close(OUTFILE);
421             }
422              
423             # If we're on an OS where different file descriptors to the same
424             # file keep different seek pointers, then the data written by
425             # the earlier command above could be overwritten by the data
426             # written by the later command, so we need to detect that
427             # possibility, hence the regexp below.
428 1 50       31 if ($text !~ /^(?:foo\n)?baz\nscrelt\n$/) {
429 0         0 die "Unexpected data in output file: $text";
430             }
431              
432 1         19 start_job({stdin_file=>$outfile}, 'cat');
433 1         36 watch_jobs();
434              
435 1 50       216 open(OUTFILE, $outfile) || croak;
436              
437 1         22 start_job({stdin_handle=>*OUTFILE{IO}}, 'cat');
438 1         26 watch_jobs();
439              
440 1         152 unlink($outfile);
441             }
442             }
443            
444             1;
445              
446             ################ Documentation ################
447              
448             =head1 NAME
449              
450             Parallel::Jobs - run jobs in parallel with access to their stdout and stderr
451              
452             =head1 SYNOPSIS
453              
454             use Parallel::Jobs;
455             use Parallel::Jobs qw(start_job watch_jobs);
456              
457             $pid = Parallel::Jobs::start_job('cmd', ... args ...);
458             $pid = Parallel::Jobs::start_job('cmd ... args ...');
459             $pid = Parallel::Jobs::start_job({ stdin_file => filename |
460             stdin_handle => *HANDLE,
461             stdout_handle => *HANDLE |
462             stdout_capture => 1,
463             stderr_handle => *HANDLE |
464             stderr_capture => 1 },
465             ... cmd as above ...);
466              
467             ($pid, $event, $data) = Parallel::Jobs::watch_jobs();
468             ($pid, $event, $data) = Parallel::Jobs::watch_jobs({nohang => 1});
469              
470             =head1 DESCRIPTION
471              
472             The Parallel::Jobs module allows you to run multiple jobs in parallel
473             with fine-grained control over their stdin, stdout and stderr. That
474             control is the biggest difference between this module and others such
475             as Parallel::ForkManager.
476              
477             You can specify the command to run as a single string or as a list
478             specifying the command and its arguments, as in L. If
479             your version of IPC::Open3 supports '-' as the command,
480             Parallel::Jobs::start_job() will fork to a Perl child in a manner
481             analogous to C.
482              
483             If your first argument is a reference to a hash, it can specify the
484             parameters shown above. By default, stdin for each job is set to
485             /dev/null and stdout and stderr are set to the stdout and stderr of
486             the calling process.
487              
488             If you specify stdin_handle, stdout_handle or stderr_handle, the
489             handle will be copied the original handle will thus not be modified.
490              
491             Each time you call Parallel::Jobs::watch_jobs(), it will return the
492             process ID of the job with which an event has occured, the event type,
493             and the data associated with that event. If there are no more jobs to
494             watch, watch_jobs() will return undef. If you want to poll for pending
495             events without hanging if there are none currently available, specify
496             a hash with the key "nohang" set to a true value.
497              
498             The relevant events are as follows:
499              
500             =over
501              
502             =item EXIT
503              
504             The indicated process has exited. The returned data is the value of
505             $? from the exited process. The process has already been waited for
506             (i.e., you don't need to do any cleanup on it).
507              
508             =item STDOUT
509              
510             Output has been received on stdout. The returned data is the output
511             that was received, or an empty string if EOF was received.
512              
513             =item STDERR
514              
515             Output has been received on stderr. The returned data is the output
516             that was received, or an empty string if EOF was received.
517              
518             =back
519              
520             Note that it is possible to receive STDOUT or STDERR events from a
521             process after its EXIT event, i.e., you may receive an EXIT event
522             before you've read all of a process's output.
523              
524             =head1 AUTHOR
525              
526             Jonathan Kamens Ejik@kamens.usE
527              
528             =head1 CREDITS
529              
530             This module was written and is maintained by Jonathan Kamens
531             (originally Ejik@worldwinner.comE, currently
532             Ejik@kamens.usE). In addition, the following people
533             contributed bug fixes, enhancements, and/or useful suggestions:
534              
535             =over
536              
537             =item Paul GABORIT Egaborit@enstimac.frE
538              
539             =item Adam Spiers Eperl@adamspiers.orgE
540              
541             =item Greg Lindahl Egreg@blekko.comE
542              
543             =back
544              
545             =head1 COPYRIGHT AND LICENSE
546              
547             =over
548              
549             =item Copyright 2002-2003 by WorldWinner.com, Inc.
550              
551             =item Copyright 2012, 2013 Jonathan Kamens.
552              
553             =back
554              
555             This library is free software; you can redistribute it and/or modify
556             it under the same terms as Perl itself.
557              
558             =cut