File Coverage

blib/lib/Parallel/ForkManager.pm
Criterion Covered Total %
statement 124 148 83.7
branch 46 70 65.7
condition 22 44 50.0
subroutine 26 33 78.7
pod 16 21 76.1
total 234 316 74.0


line stmt bran cond sub pod time code
1             package Parallel::ForkManager;
2             our $AUTHORITY = 'cpan:DLUX';
3             # ABSTRACT: A simple parallel processing fork manager
4             $Parallel::ForkManager::VERSION = '1.20';
5 106     106   5788175 use POSIX ":sys_wait_h";
  106         521633  
  106         644  
6 106     106   176020 use Storable qw(store retrieve);
  106         257537  
  106         5647  
7 106     106   638 use File::Spec;
  106         199  
  106         2160  
8 106     106   64242 use File::Temp ();
  106         1824752  
  106         2471  
9 106     106   721 use File::Path ();
  106         204  
  106         1307  
10 106     106   329 use Carp;
  106         124  
  106         4571  
11              
12 106     106   440 use strict;
  106         200  
  106         173640  
13              
14             sub new {
15 237     237 1 223363 my ($c,$processes,$tempdir)=@_;
16              
17 237 50       2144 my $h={
18             max_proc => $processes,
19             processes => {},
20             in_child => 0,
21             parent_pid => $$,
22             auto_cleanup => ($tempdir ? 0 : 1),
23             waitpid_blocking_sleep => 1,
24             };
25              
26              
27             # determine temporary directory for storing data structures
28             # add it to Parallel::ForkManager object so children can use it
29             # We don't let it clean up so it won't do it in the child process
30             # but we have our own DESTROY to do that.
31 237 50 33     1084 if (not defined($tempdir) or not length($tempdir)) {
32 237         737 $tempdir = File::Temp::tempdir(CLEANUP => 0);
33             }
34 237 50 33     71712 die qq|Temporary directory "$tempdir" doesn't exist or is not a directory.| unless (-e $tempdir && -d _); # ensure temp dir exists and is indeed a directory
35 237         1038 $h->{tempdir} = $tempdir;
36              
37 237   33     1479 return bless($h,ref($c)||$c);
38             };
39              
40             sub start {
41 3405     3405 1 81861 my ($s,$identification)=@_;
42              
43             die "Cannot start another process while you are in the child process"
44 3405 50       10552 if $s->{in_child};
45 3405   66     62796 while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
  6305         41636  
46 2900         13774 $s->on_wait;
47 2900 50       48011 $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
48             };
49 3405         29875 $s->wait_children;
50 3405 50       11064 if ($s->{max_proc}) {
51 3405         2471554 my $pid=fork();
52 3405 50       54749 die "Cannot fork: $!" if !defined $pid;
53 3405 100       55053 if ($pid) {
54 3306         132918 $s->{processes}->{$pid}=$identification;
55 3306         102520 $s->on_start($pid,$identification);
56             } else {
57 99 50       5859 $s->{in_child}=1 if !$pid;
58             }
59 3405         98445 return $pid;
60             } else {
61 0         0 $s->{processes}->{$$}=$identification;
62 0         0 $s->on_start($$,$identification);
63 0         0 return 0; # Simulating the child which returns 0
64             }
65             }
66              
67             sub start_child {
68 20     20 1 303 my $self = shift;
69 20         52 my $sub = pop;
70 20         111 my $identification = shift;
71              
72 20 100       51 $self->start( $identification ) # in the parent
73             # ... or the child
74             or $self->finish( 0, $sub->() );
75             }
76              
77              
78             sub finish {
79 98     98 1 11024227 my ($s, $x, $r)=@_;
80              
81 98 50       2282 if ( $s->{in_child} ) {
82 98 100       1720 if (defined($r)) { # store the child's data structure
83 12         1460 my $storable_tempfile = File::Spec->catfile($s->{tempdir}, 'Parallel-ForkManager-' . $s->{parent_pid} . '-' . $$ . '.txt');
84 12         201 my $stored = eval { return &store($r, $storable_tempfile); };
  12         398  
85              
86             # handle Storables errors, IE logcarp or carp returning undef, or die (via logcroak or croak)
87 12 50 33     6127 if (not $stored or $@) {
88 0         0 warn(qq|The storable module was unable to store the child's data structure to the temp file "$storable_tempfile": | . join(', ', $@));
89             }
90             }
91 98   50     6336 CORE::exit($x || 0);
92             }
93 0 0       0 if ($s->{max_proc} == 0) { # max_proc == 0
94 0         0 $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0, $r);
95 0         0 delete $s->{processes}->{$$};
96             }
97 0         0 return 0;
98             }
99              
100             sub wait_children {
101 3405     3405 0 9318 my ($s)=@_;
102              
103 3405 100       5653 return if !keys %{$s->{processes}};
  3405         13239  
104 851         2517 my $kid;
105 851   33     6030 do {
      66        
106 1004         12760 $kid = $s->wait_one_child(&WNOHANG);
107             } while defined $kid and ( $kid > 0 or $kid < -1 ); # AS 5.6/Win32 returns negative PIDs
108             };
109              
110             *wait_childs=*wait_children; # compatibility
111             *reap_finished_children=*wait_children; # behavioral synonym for clarity
112              
113             sub wait_one_child {
114 4086     4086 0 2191146 my ($s,$flag)=@_;
115              
116 4086         5993 my $kid;
117 4086         6496 while (1) {
118 4086   100     57966 $kid = $s->_waitpid($flag||=0);
119              
120 4086 100       32810 last unless defined $kid;
121              
122 3233 50 33     46083 last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
123 3233 50       21460 redo if !exists $s->{processes}->{$kid};
124 3233         19543 my $id = delete $s->{processes}->{$kid};
125              
126             # retrieve child data structure, if any
127 3233         8071 my $retrieved = undef;
128 3233         140519 my $storable_tempfile = File::Spec->catfile($s->{tempdir}, 'Parallel-ForkManager-' . $s->{parent_pid} . '-' . $kid . '.txt');
129 3233 100       126458 if (-e $storable_tempfile) { # child has option of not storing anything, so we need to see if it did or not
130 45         91 $retrieved = eval { return &retrieve($storable_tempfile); };
  45         383  
131              
132             # handle Storables errors
133 45 50 33     4700 if (not $retrieved or $@) {
134 0         0 warn(qq|The storable module was unable to retrieve the child's data structure from the temporary file "$storable_tempfile": | . join(', ', $@));
135             }
136              
137             # clean up after ourselves
138 45         2429 unlink $storable_tempfile;
139             }
140              
141 3233 50       56652 $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0, $retrieved);
142 3233         9409 last;
143             }
144 4086         25079 $kid;
145             };
146              
147             sub wait_all_children {
148 135     135 1 2187330 my ($s)=@_;
149              
150 135         1257 while (keys %{ $s->{processes} }) {
  313         3279  
151 178         2101 $s->on_wait;
152 178 50       2961 $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
153             };
154             }
155              
156             *wait_all_childs=*wait_all_children; # compatibility;
157              
158 5     5 1 70 sub max_procs { $_[0]->{max_proc}; }
159              
160 0     0 1 0 sub is_child { $_[0]->{in_child} }
161              
162 0     0 1 0 sub is_parent { !$_[0]->{in_child} }
163              
164             sub running_procs {
165 4310     4310 1 8814 my $self = shift;
166              
167 4310         6450 my @pids = keys %{ $self->{processes} };
  4310         30788  
168 4310         29466 return @pids;
169             }
170              
171             sub wait_for_available_procs {
172 1     1 1 1556 my( $self, $nbr ) = @_;
173 1   50     9 $nbr ||= 1;
174              
175 1 50       4 croak "number of processes '$nbr' higher than the max number of processes (@{[ $self->max_procs ]})"
  0         0  
176             if $nbr > $self->max_procs;
177              
178 1         4 $self->wait_one_child until $self->max_procs - $self->running_procs >= $nbr;
179             }
180              
181             sub run_on_finish {
182 23     23 1 287 my ($s,$code,$pid)=@_;
183              
184 23   50     233 $s->{on_finish}->{$pid || 0}=$code;
185             }
186              
187             sub on_finish {
188 3233     3233 0 14096 my ($s,$pid,@par)=@_;
189              
190 3233 100 66     40855 my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
191 45         238 $code->($pid,@par);
192             };
193              
194             sub run_on_wait {
195 0     0 1 0 my ($s,$code, $period)=@_;
196              
197 0         0 $s->{on_wait}=$code;
198 0         0 $s->{on_wait_period} = $period;
199             }
200              
201             sub on_wait {
202 3078     3078 0 7009 my ($s)=@_;
203              
204 3078 50       12218 if(ref($s->{on_wait}) eq 'CODE') {
205 0         0 $s->{on_wait}->();
206 0 0       0 if (defined $s->{on_wait_period}) {
207 0 0   0   0 local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD};
208             select undef, undef, undef, $s->{on_wait_period}
209 0         0 };
210             };
211             };
212              
213             sub run_on_start {
214 0     0 1 0 my ($s,$code)=@_;
215              
216 0         0 $s->{on_start}=$code;
217             }
218              
219             sub on_start {
220 3306     3306 0 48644 my ($s,@par)=@_;
221              
222 3306 50       21049 $s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE';
223             };
224              
225             sub set_max_procs {
226 0     0 1 0 my ($s, $mp)=@_;
227              
228 0         0 $s->{max_proc} = $mp;
229             }
230              
231             sub set_waitpid_blocking_sleep {
232 206     206 1 2112 my( $self, $period ) = @_;
233 206         745 $self->{waitpid_blocking_sleep} = $period;
234             }
235              
236             sub waitpid_blocking_sleep {
237 0     0 1 0 $_[0]->{waitpid_blocking_sleep};
238             }
239              
240             sub _waitpid { # Call waitpid() in the standard Unix fashion.
241 4086     4086   10957 my( $self, $flag ) = @_;
242              
243 4086 100       36540 return $flag ? $self->_waitpid_non_blocking : $self->_waitpid_blocking;
244             }
245              
246             sub _waitpid_non_blocking {
247 3201     3201   12782 my $self = shift;
248              
249 3201         10812 for my $pid ( $self->running_procs ) {
250 3946 100       80407 my $p = waitpid $pid, &WNOHANG or next;
251              
252 1252 100       9566 return $pid if $p != -1;
253              
254 2         80 warn "child process '$pid' disappeared. A call to `waitpid` outside of Parallel::ForkManager might have reaped it.\n";
255             # it's gone. let's clean the process entry
256 2         98 delete $self->{processes}{$pid};
257             }
258              
259 1951         5294 return;
260             }
261              
262             sub _waitpid_blocking {
263 3082     3082   7539 my $self = shift;
264              
265             # pseudo-blocking
266 3082 100       20314 if( my $sleep_period = $self->{waitpid_blocking_sleep} ) {
267 1099         3124 while() {
268 2197         22552 my $pid = $self->_waitpid_non_blocking;
269              
270 2197 100       12884 return $pid if defined $pid;
271              
272 1100 100       3405 return unless $self->running_procs;
273              
274 1098         1103054496 select undef, undef, undef, $sleep_period;
275             }
276             }
277              
278 1983         291507179 return waitpid -1, 0;
279             }
280              
281             sub DESTROY {
282 237     237   60249 my ($self) = @_;
283              
284 237 100 66     38976 if ($self->{auto_cleanup} && $self->{parent_pid} == $$ && -d $self->{tempdir}) {
      66        
285 137         75466 File::Path::remove_tree($self->{tempdir});
286             }
287             }
288              
289             1;
290              
291             __END__