File Coverage

blib/lib/Parallel/ForkManager.pm
Criterion Covered Total %
statement 129 152 84.8
branch 42 62 67.7
condition 18 33 54.5
subroutine 29 35 82.8
pod 14 21 66.6
total 232 303 76.5


line stmt bran cond sub pod time code
1             package Parallel::ForkManager;
2             our $AUTHORITY = 'cpan:DLUX';
3             # ABSTRACT: A simple parallel processing fork manager
4             $Parallel::ForkManager::VERSION = '2.02';
5 106     106   7392556 use POSIX ":sys_wait_h";
  106         701302  
  106         533  
6 106     106   237681 use Storable ();
  106         337817  
  106         2921  
7 106     106   769 use File::Spec;
  106         206  
  106         2721  
8 106     106   78386 use File::Temp ();
  106         2241866  
  106         3032  
9 106     106   832 use File::Path ();
  106         135  
  106         2551  
10 106     106   963 use Carp;
  106         127  
  106         6063  
11              
12 106     106   50196 use Parallel::ForkManager::Child;
  106         311  
  106         3141  
13              
14 106     106   738 use strict;
  106         223  
  106         2012  
15              
16 106     106   59943 use Moo;
  106         287128  
  106         950  
17              
18             has max_proc => (
19             is => 'ro',
20             required => 1,
21             writer => 'set_max_procs',
22             );
23              
24             has processes => (
25             is => 'ro',
26             default => sub { {} },
27             );
28              
29             has parent_pid => (
30             is => 'ro',
31             default => sub { $$ },
32             );
33              
34             has auto_cleanup => (
35             is => 'rw',
36             default => sub { 1 },
37             );
38              
39             has waitpid_blocking_sleep => (
40             is =>'ro',
41             writer => 'set_waitpid_blocking_sleep',
42             default => sub { 1 },
43             );
44              
45             has tempdir => (
46             is => 'ro',
47             default => sub {
48             File::Temp::tempdir(CLEANUP => 0);
49             },
50             trigger => sub {
51             my( $self, $dir ) = @_;
52              
53             die qq|Temporary directory "$dir" doesn't exist or is not a directory.|
54             unless -d $dir;
55              
56             $self->auto_cleanup(0);
57             },
58             );
59              
60             has child_role => (
61             is => 'ro',
62             default => 'Parallel::ForkManager::Child',
63             );
64              
65 0     0 1 0 sub is_child { 0 }
66 0     0 1 0 sub is_parent { 1 }
67              
68              
69             sub BUILDARGS {
70 237     237 0 929963 my ( undef, @args ) = @_;
71 237         697 my %args;
72              
73 237 50       3237 if ( $args[0] =~ /^\d+$/ ) {
74 237         929 $args{max_proc} = shift @args;
75 237 50       979 $args{tempdir} = shift @args if @args;
76             }
77             else {
78 0         0 %args = @args;
79             }
80              
81 237         4711 return \%args;
82             }
83              
84             sub start {
85 3405     3405 1 108462 my ($s,$identification)=@_;
86              
87 3405   66     104518 while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
  6210         74001  
88 2805         18903 $s->on_wait;
89 2805 50       35059 $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
90             };
91 3405         33478 $s->wait_children;
92 3405 50       14371 if ($s->{max_proc}) {
93 3405         3140049 my $pid=fork();
94 3405 50       74749 die "Cannot fork: $!" if !defined $pid;
95 3405 100       76146 if ($pid) { # in parent
96 3306         178864 $s->{processes}->{$pid}=$identification;
97 3306         107907 $s->on_start($pid,$identification);
98             } else {
99 99         19991 Role::Tiny->apply_roles_to_object( $s, $s->child_role );
100             }
101 3405         339211 return $pid;
102             }
103            
104             # non-forking mode
105 0         0 $s->{processes}->{$$}=$identification;
106 0         0 $s->on_start($$,$identification);
107 0         0 return 0; # Simulating the child which returns 0
108             }
109              
110             sub start_child {
111 20     20 1 644 my $self = shift;
112 20         58 my $sub = pop;
113 20         138 my $identification = shift;
114              
115 20 100       344 $self->start( $identification ) # in the parent
116             # ... or the child
117             or $self->finish( 0, $sub->() );
118             }
119              
120              
121             sub finish {
122 0     0 1 0 my ($s, $x, $r)=@_;
123              
124 0 0       0 if ($s->{max_proc} == 0) { # nofork
125 0         0 $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0, $r);
126 0         0 delete $s->{processes}->{$$};
127             }
128              
129 0         0 return 0;
130             }
131              
132             sub wait_children {
133 3405     3405 0 12843 my ($s)=@_;
134              
135 3405 100       7297 return if !keys %{$s->{processes}};
  3405         14895  
136 851         3138 my $kid;
137 851   33     3225 do {
      66        
138 1102         19398 $kid = $s->wait_one_child(&WNOHANG);
139             } while defined $kid and ( $kid > 0 or $kid < -1 ); # AS 5.6/Win32 returns negative PIDs
140             };
141              
142             *wait_childs=*wait_children; # compatibility
143             *reap_finished_children=*wait_children; # behavioral synonym for clarity
144              
145             # TODO document the method
146             sub retrieve {
147 3236     3236 1 19537 my( $self, $kid ) = @_;
148              
149 3236         11655 my $retrieved = undef;
150              
151 3236         194311 my $storable_tempfile = File::Spec->catfile($self->{tempdir}, 'Parallel-ForkManager-' . $self->{parent_pid} . '-' . $kid . '.txt');
152              
153 3236 100       177147 if (-e $storable_tempfile) { # child has option of not storing anything, so we need to see if it did or not
154 45         347 $retrieved = eval { Storable::retrieve($storable_tempfile) };
  45         498  
155              
156             # handle Storables errors
157 45 50 33     7707 if (not $retrieved or $@) {
158 0         0 warn(qq|The storable module was unable to retrieve the child's data structure from the temporary file "$storable_tempfile": | . join(', ', $@));
159             }
160              
161             # clean up after ourselves
162 45         63001 unlink $storable_tempfile;
163             }
164              
165 3236         16291 return $retrieved;
166             }
167              
168             sub store {
169 98     98 1 1296 my( $self, $data ) = @_;
170              
171 98 100       2011 return unless defined $data;
172              
173 12         1459 my $storable_tempfile = File::Spec->catfile($self->{tempdir}, 'Parallel-ForkManager-' . $self->{parent_pid} . '-' . $$ . '.txt');
174 12         151 my $stored = eval { return Storable::store($data, $storable_tempfile); };
  12         425  
175              
176             # handle Storables errors, IE logcarp or carp returning undef, or die (via logcroak or croak)
177 12 50 33     5972 if (not $stored or $@) {
178 0         0 warn(qq|The storable module was unable to store the child's data structure to the temp file "$storable_tempfile": | . join(', ', $@));
179             }
180             }
181              
182              
183              
184             sub wait_one_child {
185 4089     4089 0 2303303 my ($s,$flag)=@_;
186              
187 4089         7952 my $kid;
188 4089         8822 while (1) {
189 4089   100     63001 $kid = $s->_waitpid($flag||=0);
190              
191 4089 100       78132 last unless defined $kid;
192              
193 3236 50 33     56570 last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
194 3236 50       25659 redo if !exists $s->{processes}->{$kid};
195 3236         23920 my $id = delete $s->{processes}->{$kid};
196              
197             # retrieve child data structure, if any
198 3236         39737 my $retrieved = $s->retrieve($kid);
199              
200 3236 50       90821 $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0, $retrieved);
201 3236         12108 last;
202             }
203 4089         38056 $kid;
204             };
205              
206             sub wait_all_children {
207 135     135 1 2298937 my ($s)=@_;
208              
209 135         1398 while (keys %{ $s->{processes} }) {
  313         3907  
210 178         1860 $s->on_wait;
211 178 50       2846 $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
212             };
213             }
214              
215             *wait_all_childs=*wait_all_children; # compatibility;
216              
217 5     5 1 87 sub max_procs { $_[0]->max_proc }
218              
219             sub running_procs {
220 4408     4408 1 14249 my $self = shift;
221              
222 4408         24204 my @pids = keys %{ $self->{processes} };
  4408         58563  
223 4408         49625 return @pids;
224             }
225              
226             sub wait_for_available_procs {
227 1     1 1 1400 my( $self, $nbr ) = @_;
228 1   50     16 $nbr ||= 1;
229              
230 1 50       5 croak "number of processes '$nbr' higher than the max number of processes (@{[ $self->max_procs ]})"
  0         0  
231             if $nbr > $self->max_procs;
232              
233 1         4 $self->wait_one_child until $self->max_procs - $self->running_procs >= $nbr;
234             }
235              
236             sub run_on_finish {
237 23     23 1 335 my ($s,$code,$pid)=@_;
238              
239 23   50     229 $s->{on_finish}->{$pid || 0}=$code;
240             }
241              
242             sub on_finish {
243 3236     3236 0 20975 my ($s,$pid,@par)=@_;
244              
245 3236 100 66     53993 my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
246 45         446 $code->($pid,@par);
247             };
248              
249             sub run_on_wait {
250 0     0 1 0 my ($s,$code, $period)=@_;
251              
252 0         0 $s->{on_wait}=$code;
253 0         0 $s->{on_wait_period} = $period;
254             }
255              
256             sub on_wait {
257 2983     2983 0 9663 my ($s)=@_;
258              
259 2983 50       12329 if(ref($s->{on_wait}) eq 'CODE') {
260 0         0 $s->{on_wait}->();
261 0 0       0 if (defined $s->{on_wait_period}) {
262 0 0   0   0 local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD};
263             select undef, undef, undef, $s->{on_wait_period}
264 0         0 };
265             };
266             };
267              
268             sub run_on_start {
269 0     0 1 0 my ($s,$code)=@_;
270              
271 0         0 $s->{on_start}=$code;
272             }
273              
274             sub on_start {
275 3306     3306 0 99359 my ($s,@par)=@_;
276              
277 3306 50       32330 $s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE';
278             };
279              
280             sub _waitpid { # Call waitpid() in the standard Unix fashion.
281 4089     4089   16034 my( $self, $flag ) = @_;
282              
283 4089 100       24288 return $flag ? $self->_waitpid_non_blocking : $self->_waitpid_blocking;
284             }
285              
286             sub _waitpid_non_blocking {
287 3299     3299   13464 my $self = shift;
288              
289 3299         21688 for my $pid ( $self->running_procs ) {
290 4008 100       123371 my $p = waitpid $pid, &WNOHANG or next;
291              
292 1350 100       15052 return $pid if $p != -1;
293              
294 2         129 warn "child process '$pid' disappeared. A call to `waitpid` outside of Parallel::ForkManager might have reaped it.\n";
295             # it's gone. let's clean the process entry
296 2         167 delete $self->{processes}{$pid};
297             }
298              
299 1951         8486 return;
300             }
301              
302             sub _waitpid_blocking {
303 2987     2987   7969 my $self = shift;
304              
305             # pseudo-blocking
306 2987 100       21977 if( my $sleep_period = $self->{waitpid_blocking_sleep} ) {
307 1099         11450 while() {
308 2197         46861 my $pid = $self->_waitpid_non_blocking;
309              
310 2197 100       15385 return $pid if defined $pid;
311              
312 1100 100       3654 return unless $self->running_procs;
313              
314 1098         1103282629 select undef, undef, undef, $sleep_period;
315             }
316             }
317              
318 1888         440619021 return waitpid -1, 0;
319             }
320              
321             sub DEMOLISH {
322 237     237 0 597971 my $self = shift;
323              
324 106     106   370506 no warnings 'uninitialized';
  106         386  
  106         11108  
325              
326             File::Path::remove_tree($self->{tempdir})
327 237 100 66     105943 if $self->{auto_cleanup} and $self->{parent_pid} == $$ and -d $self->{tempdir};
      66        
328             }
329              
330             1;
331              
332             __END__