File Coverage

blib/lib/Parallel/ForkManager.pm
Criterion Covered Total %
statement 129 152 84.8
branch 42 62 67.7
condition 18 33 54.5
subroutine 29 35 82.8
pod 14 21 66.6
total 232 303 76.5


line stmt bran cond sub pod time code
1             package Parallel::ForkManager;
2             our $AUTHORITY = 'cpan:DLUX';
3             # ABSTRACT: A simple parallel processing fork manager
4             $Parallel::ForkManager::VERSION = '2.01'; # TRIAL
5 106     106   6476035 use POSIX ":sys_wait_h";
  106         618285  
  106         677  
6 106     106   229588 use Storable ();
  106         289584  
  106         2514  
7 106     106   718 use File::Spec;
  106         201  
  106         2256  
8 106     106   77473 use File::Temp ();
  106         1915451  
  106         2763  
9 106     106   719 use File::Path ();
  106         133  
  106         1468  
10 106     106   508 use Carp;
  106         205  
  106         5049  
11              
12 106     106   43354 use Parallel::ForkManager::Child;
  106         320  
  106         3217  
13              
14 106     106   722 use strict;
  106         289  
  106         1709  
15              
16 106     106   45917 use Moo;
  106         233042  
  106         524  
17              
18             has max_proc => (
19             is => 'ro',
20             required => 1,
21             writer => 'set_max_procs',
22             );
23              
24             has processes => (
25             is => 'ro',
26             default => sub { {} },
27             );
28              
29             has parent_pid => (
30             is => 'ro',
31             default => sub { $$ },
32             );
33              
34             has auto_cleanup => (
35             is => 'rw',
36             default => sub { 1 },
37             );
38              
39             has waitpid_blocking_sleep => (
40             is =>'ro',
41             writer => 'set_waitpid_blocking_sleep',
42             default => sub { 1 },
43             );
44              
45             has tempdir => (
46             is => 'ro',
47             default => sub {
48             File::Temp::tempdir(CLEANUP => 0);
49             },
50             trigger => sub {
51             my( $self, $dir ) = @_;
52              
53             die qq|Temporary directory "$dir" doesn't exist or is not a directory.|
54             unless -d $dir;
55              
56             $self->auto_cleanup(0);
57             },
58             );
59              
60             has child_role => (
61             is => 'ro',
62             default => 'Parallel::ForkManager::Child',
63             );
64              
65 0     0 1 0 sub is_child { 0 }
66 0     0 1 0 sub is_parent { 1 }
67              
68              
69             sub BUILDARGS {
70 237     237 0 738101 my ( undef, @args ) = @_;
71 237         589 my %args;
72              
73 237 50       2543 if ( $args[0] =~ /^\d+$/ ) {
74 237         837 $args{max_proc} = shift @args;
75 237 50       877 $args{tempdir} = shift @args if @args;
76             }
77             else {
78 0         0 %args = @args;
79             }
80              
81 237         4034 return \%args;
82             }
83              
84             sub start {
85 3405     3405 1 104096 my ($s,$identification)=@_;
86              
87 3405   66     71427 while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
  6235         35763  
88 2830         51969 $s->on_wait;
89 2830 50       31164 $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
90             };
91 3405         22648 $s->wait_children;
92 3405 50       11081 if ($s->{max_proc}) {
93 3405         3182277 my $pid=fork();
94 3405 50       74632 die "Cannot fork: $!" if !defined $pid;
95 3405 100       67085 if ($pid) { # in parent
96 3306         148250 $s->{processes}->{$pid}=$identification;
97 3306         673812 $s->on_start($pid,$identification);
98             } else {
99 99         17929 Role::Tiny->apply_roles_to_object( $s, $s->child_role );
100             }
101 3405         274552 return $pid;
102             }
103            
104             # non-forking mode
105 0         0 $s->{processes}->{$$}=$identification;
106 0         0 $s->on_start($$,$identification);
107 0         0 return 0; # Simulating the child which returns 0
108             }
109              
110             sub start_child {
111 20     20 1 441 my $self = shift;
112 20         56 my $sub = pop;
113 20         122 my $identification = shift;
114              
115 20 100       297 $self->start( $identification ) # in the parent
116             # ... or the child
117             or $self->finish( 0, $sub->() );
118             }
119              
120              
121             sub finish {
122 0     0 1 0 my ($s, $x, $r)=@_;
123              
124 0 0       0 if ($s->{max_proc} == 0) { # nofork
125 0         0 $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0, $r);
126 0         0 delete $s->{processes}->{$$};
127             }
128              
129 0         0 return 0;
130             }
131              
132             sub wait_children {
133 3405     3405 0 12699 my ($s)=@_;
134              
135 3405 100       6816 return if !keys %{$s->{processes}};
  3405         12523  
136 851         2506 my $kid;
137 851   33     2727 do {
      66        
138 1076         17505 $kid = $s->wait_one_child(&WNOHANG);
139             } while defined $kid and ( $kid > 0 or $kid < -1 ); # AS 5.6/Win32 returns negative PIDs
140             };
141              
142             *wait_childs=*wait_children; # compatibility
143             *reap_finished_children=*wait_children; # behavioral synonym for clarity
144              
145             # TODO document the method
146             sub retrieve {
147 3235     3235 1 18241 my( $self, $kid ) = @_;
148              
149 3235         8262 my $retrieved = undef;
150              
151 3235         182739 my $storable_tempfile = File::Spec->catfile($self->{tempdir}, 'Parallel-ForkManager-' . $self->{parent_pid} . '-' . $kid . '.txt');
152              
153 3235 100       146597 if (-e $storable_tempfile) { # child has option of not storing anything, so we need to see if it did or not
154 45         159 $retrieved = eval { Storable::retrieve($storable_tempfile) };
  45         332  
155              
156             # handle Storables errors
157 45 50 33     4877 if (not $retrieved or $@) {
158 0         0 warn(qq|The storable module was unable to retrieve the child's data structure from the temporary file "$storable_tempfile": | . join(', ', $@));
159             }
160              
161             # clean up after ourselves
162 45         2599 unlink $storable_tempfile;
163             }
164              
165 3235         17376 return $retrieved;
166             }
167              
168             sub store {
169 98     98 1 838 my( $self, $data ) = @_;
170              
171 98 100       1168 return unless defined $data;
172              
173 12         886 my $storable_tempfile = File::Spec->catfile($self->{tempdir}, 'Parallel-ForkManager-' . $self->{parent_pid} . '-' . $$ . '.txt');
174 12         149 my $stored = eval { return Storable::store($data, $storable_tempfile); };
  12         372  
175              
176             # handle Storables errors, IE logcarp or carp returning undef, or die (via logcroak or croak)
177 12 50 33     4398 if (not $stored or $@) {
178 0         0 warn(qq|The storable module was unable to store the child's data structure to the temp file "$storable_tempfile": | . join(', ', $@));
179             }
180             }
181              
182              
183              
184             sub wait_one_child {
185 4088     4088 0 2251496 my ($s,$flag)=@_;
186              
187 4088         7644 my $kid;
188 4088         6907 while (1) {
189 4088   100     82488 $kid = $s->_waitpid($flag||=0);
190              
191 4088 100       44328 last unless defined $kid;
192              
193 3235 50 33     27370 last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
194 3235 50       28697 redo if !exists $s->{processes}->{$kid};
195 3235         18477 my $id = delete $s->{processes}->{$kid};
196              
197             # retrieve child data structure, if any
198 3235         20837 my $retrieved = $s->retrieve($kid);
199              
200 3235 50       76374 $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0, $retrieved);
201 3235         12808 last;
202             }
203 4088         36781 $kid;
204             };
205              
206             sub wait_all_children {
207 135     135 1 2272877 my ($s)=@_;
208              
209 135         767 while (keys %{ $s->{processes} }) {
  313         3195  
210 178         2573 $s->on_wait;
211 178 50       2166 $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
212             };
213             }
214              
215             *wait_all_childs=*wait_all_children; # compatibility;
216              
217 5     5 1 104 sub max_procs { $_[0]->max_proc }
218              
219             sub running_procs {
220 4382     4382 1 13844 my $self = shift;
221              
222 4382         22526 my @pids = keys %{ $self->{processes} };
  4382         29318  
223 4382         35406 return @pids;
224             }
225              
226             sub wait_for_available_procs {
227 1     1 1 1429 my( $self, $nbr ) = @_;
228 1   50     34 $nbr ||= 1;
229              
230 1 50       11 croak "number of processes '$nbr' higher than the max number of processes (@{[ $self->max_procs ]})"
  0         0  
231             if $nbr > $self->max_procs;
232              
233 1         10 $self->wait_one_child until $self->max_procs - $self->running_procs >= $nbr;
234             }
235              
236             sub run_on_finish {
237 23     23 1 353 my ($s,$code,$pid)=@_;
238              
239 23   50     198 $s->{on_finish}->{$pid || 0}=$code;
240             }
241              
242             sub on_finish {
243 3235     3235 0 23582 my ($s,$pid,@par)=@_;
244              
245 3235 100 66     50043 my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
246 45         314 $code->($pid,@par);
247             };
248              
249             sub run_on_wait {
250 0     0 1 0 my ($s,$code, $period)=@_;
251              
252 0         0 $s->{on_wait}=$code;
253 0         0 $s->{on_wait_period} = $period;
254             }
255              
256             sub on_wait {
257 3008     3008 0 9636 my ($s)=@_;
258              
259 3008 50       11307 if(ref($s->{on_wait}) eq 'CODE') {
260 0         0 $s->{on_wait}->();
261 0 0       0 if (defined $s->{on_wait_period}) {
262 0 0   0   0 local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD};
263             select undef, undef, undef, $s->{on_wait_period}
264 0         0 };
265             };
266             };
267              
268             sub run_on_start {
269 0     0 1 0 my ($s,$code)=@_;
270              
271 0         0 $s->{on_start}=$code;
272             }
273              
274             sub on_start {
275 3306     3306 0 150788 my ($s,@par)=@_;
276              
277 3306 50       81588 $s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE';
278             };
279              
280             sub _waitpid { # Call waitpid() in the standard Unix fashion.
281 4088     4088   10349 my( $self, $flag ) = @_;
282              
283 4088 100       19255 return $flag ? $self->_waitpid_non_blocking : $self->_waitpid_blocking;
284             }
285              
286             sub _waitpid_non_blocking {
287 3273     3273   23004 my $self = shift;
288              
289 3273         24662 for my $pid ( $self->running_procs ) {
290 3979 100       301835 my $p = waitpid $pid, &WNOHANG or next;
291              
292 1324 100       17355 return $pid if $p != -1;
293              
294 2         81 warn "child process '$pid' disappeared. A call to `waitpid` outside of Parallel::ForkManager might have reaped it.\n";
295             # it's gone. let's clean the process entry
296 2         118 delete $self->{processes}{$pid};
297             }
298              
299 1951         21822 return;
300             }
301              
302             sub _waitpid_blocking {
303 3012     3012   5504 my $self = shift;
304              
305             # pseudo-blocking
306 3012 100       36514 if( my $sleep_period = $self->{waitpid_blocking_sleep} ) {
307 1099         67145 while() {
308 2197         31112 my $pid = $self->_waitpid_non_blocking;
309              
310 2197 100       12405 return $pid if defined $pid;
311              
312 1100 100       2607 return unless $self->running_procs;
313              
314 1098         1103173466 select undef, undef, undef, $sleep_period;
315             }
316             }
317              
318 1913         404612125 return waitpid -1, 0;
319             }
320              
321             sub DEMOLISH {
322 237     237 0 527954 my $self = shift;
323              
324 106     106   309593 no warnings 'uninitialized';
  106         383  
  106         9455  
325              
326             File::Path::remove_tree($self->{tempdir})
327 237 100 66     83256 if $self->{auto_cleanup} and $self->{parent_pid} == $$ and -d $self->{tempdir};
      66        
328             }
329              
330             1;
331              
332             __END__