File Coverage

blib/lib/Parallel/Forker.pm
Criterion Covered Total %
statement 78 183 42.6
branch 21 56 37.5
condition 9 47 19.1
subroutine 20 30 66.6
pod 21 24 87.5
total 149 340 43.8


line stmt bran cond sub pod time code
1             # See copyright, etc in below POD section.
2             ######################################################################
3              
4             package Parallel::Forker;
5             require 5.006;
6 34     34   2869579 use Carp qw(carp croak confess);
  34         438  
  34         1909  
7 34     34   227 use IO::File;
  34         53  
  34         3594  
8 34     34   12059 use Time::HiRes qw(usleep);
  34         26361  
  34         219  
9              
10 34     34   23257 use Parallel::Forker::Process;
  34         94  
  34         1171  
11 34     34   171 use strict;
  34         58  
  34         788  
12 34     34   140 use vars qw($Debug $VERSION);
  34         76  
  34         71584  
13              
14             $VERSION = '1.260';
15              
16             ######################################################################
17             #### CONSTRUCTOR
18              
19             sub new {
20 53     53 1 35006 my $class = shift;
21 53         1636 my $self = {
22             _activity => 1, # Optionally set true when a sig_child comes
23             _processes => {}, # All process objects, keyed by id
24             _labels => {}, # List of process objects, keyed by label
25             _runable => {}, # Process objects runable now, keyed by id
26             _running => {}, # Process objects running now, keyed *PID*
27             _run_after_eqn => undef,# Equation to eval to determine if ready to launch
28             _parent_pid => $$, # PID of initial process creating the forker
29             max_proc => undef, # Number processes to launch, <1=any, +=that number
30             poll_interval => 100*1000, # Poll interval in usec
31             use_sig_child => undef, # Default to not using SIGCHLD handler
32             @_
33             };
34 53   33     786 bless $self, ref($class)||$class;
35 53         211 return $self;
36             }
37              
38             #### ACCESSORS
39              
40             sub in_parent {
41 1     1 1 277 my $self = shift;
42 1         9 return $self->{_parent_pid}==$$;
43             }
44              
45             sub max_proc {
46 9     9 1 36 my $self = shift;
47 9 50       63 $self->{max_proc} = shift if $#_>=0;
48 9         18 return $self->{max_proc};
49             }
50              
51             sub poll_interval {
52 0     0 1 0 my $self = shift;
53 0 0       0 $self->{poll_interval} = shift if $#_>=0;
54 0         0 return $self->{poll_interval};
55             }
56              
57             sub use_sig_child {
58 891     891 1 1801 my $self = shift;
59 891 100       2937 $self->{use_sig_child} = shift if $#_>=0;
60 891         9087 return $self->{use_sig_child};
61             }
62              
63             sub running {
64 23     23 1 200 my $self = shift;
65 23         90 return (values %{$self->{_running}});
  23         190  
66             }
67              
68             sub running_sorted {
69 0     0 0 0 my $self = shift;
70 0         0 return (sort {$a->{name} cmp $b->{name}} values %{$self->{_running}});
  0         0  
  0         0  
71             }
72              
73             sub process {
74 0     0 1 0 my $self = shift;
75 0 0       0 confess "usage: \$fork->process(\$name)" unless scalar(@_) == 1;
76 0         0 return $self->{_processes}{$_[0]};
77             }
78              
79             sub processes {
80 52     52 1 326 my $self = shift;
81 52         108 return (values %{$self->{_processes}});
  52         508  
82             }
83              
84             sub processes_sorted {
85 42     42 1 51177 my $self = shift;
86 42         189 return (sort {$a->{name} cmp $b->{name}} values %{$self->{_processes}});
  812         2282  
  42         1127  
87             }
88              
89             sub state_stats {
90 0     0 1 0 my $self = shift;
91 0         0 my %stats = (idle=>0, ready=>0, running=>0, runable=>0,
92             done=>0, parerr=>0, reapable=>0);
93 0         0 map {$stats{$_->state}++} $self->processes;
  0         0  
94 0         0 return %stats;
95             }
96              
97             #### METHODS
98              
99             sub schedule {
100 414     414 1 28676 my $class = shift;
101 414         1662 return Parallel::Forker::Process->_new(_forkref=>$class,
102             @_);
103             }
104              
105             sub sig_child {
106             # Keep minimal to avoid coredumps
107 168 50   168 1 4036 return if !$_[0];
108 168         3744 $_[0]->{_activity} = 1;
109             }
110              
111             sub wait_all {
112 52     52 1 393 my $self = shift;
113 52         175 while ($self->is_any_left) {
114             #print "NRUNNING ", scalar ( (keys %{$self->{_running}}) ), "\n";
115 665         4682 $self->poll;
116 637         54934315 usleep $self->{poll_interval};
117             };
118             }
119              
120             sub reap_processes {
121 0     0 1 0 my $self = shift;
122              
123 0         0 my @reaped;
124 0         0 foreach my $process ($self->processes) {
125 0 0       0 next unless $process->is_reapable;
126 0         0 $process->reap;
127 0         0 push @reaped, $process;
128             }
129 0         0 return @reaped;
130             }
131              
132             sub is_any_left {
133 689     689 1 6441 my $self = shift;
134 689 100       2258 return 1 if ( (keys %{$self->{_runable}}) > 0 );
  689         6686  
135 575 100       1510 return 1 if ( (keys %{$self->{_running}}) > 0 );
  575         4857  
136             }
137              
138             sub find_proc_name {
139 426     426 1 683 my $self = shift;
140 426         816 my $name = shift;
141             # Returns list of processes matching the name or label
142 426 100       1290 if (exists $self->{_processes}{$name}) {
    100          
143 330         1521 return ($self->{_processes}{$name});
144             } elsif (exists $self->{_labels}{$name}) {
145 64         134 return @{$self->{_labels}{$name}};
  64         266  
146             }
147 32         124 return undef;
148             }
149              
150             our $_Warned_Use_Sig_Child;
151              
152             sub poll {
153 665     665 1 1518 my $self = shift;
154 665 100 100     4633 return if $self->use_sig_child && !$self->{_activity};
155 225 50       1135 if (!defined $self->use_sig_child) {
156 0 0 0     0 carp "%Warning: Forker object should be new'ed with use_sig_child=>0 or 1, "
157             if ($^W && !$_Warned_Use_Sig_Child);
158 0         0 $_Warned_Use_Sig_Child = 1;
159 0         0 $self->use_sig_child(0);
160             }
161              
162             # We don't have a loop around this any more, as we want to allow
163             # applications to do other work. We'd also need to be careful not to
164             # set _activity with no one runnable, as it would potentially cause a
165             # infinite loop.
166              
167 225         749 $self->{_activity} = 0;
168 225         607 my $nrunning = grep { not $_->poll } (values %{$self->{_running}});
  314         3845  
  225         2392  
169              
170 225 50 66     1394 if (!($self->{max_proc} && $nrunning >= $self->{max_proc})) {
171 225         498 foreach my $procref (sort {$a->{name} cmp $b->{name}} # Lanch in named order
  284         1348  
172 225         2086 values %{$self->{_runable}}) {
173 243 100 100     1864 last if ($self->{max_proc} && $nrunning >= $self->{max_proc});
174 223         2055 $procref->run;
175 195         3964 $nrunning++;
176             }
177             }
178             # If no one's running, we need _activity set to check for runable -> running
179             # transitions during the next call to poll().
180 197 100       3725 $self->{_activity} = 1 if !$nrunning;
181             }
182              
183             sub ready_all {
184 52     52 1 514 my $self = shift;
185 52         370 foreach my $procref ($self->processes) {
186 414 50       1583 $procref->ready if $procref->is_idle;
187             };
188             }
189              
190             sub kill_all {
191 0     0 1   my $self = shift;
192 0   0       my $signal = shift || 9;
193 0           foreach my $procref ($self->running_sorted) {
194 0           $procref->kill($signal);
195             };
196             }
197              
198             sub kill_tree_all {
199 0     0 1   my $self = shift;
200 0   0       my $signal = shift || 9;
201 0           foreach my $procref ($self->running_sorted) {
202 0           $procref->kill_tree($signal);
203             };
204             }
205              
206             sub write_tree {
207 0     0 1   my $self = shift;
208 0           my %params = (@_);
209 0 0         defined $params{filename} or croak "%Error: filename not specified,";
210              
211 0           my %did_print;
212 0           my $another_loop = 1;
213 0           my $level = 0;
214 0           my $line = 4;
215 0           my @lines;
216 0           while ($another_loop) {
217 0           $another_loop = 0;
218 0           $level++;
219             proc:
220 0           foreach my $procref ($self->processes_sorted) {
221 0           foreach my $ra (values %{$procref->{_after_parents}}) {
  0            
222 0 0 0       next proc if (($did_print{$ra->{name}}{level}||999) >= $level);
223             }
224 0 0         if (!$did_print{$procref->{name}}{level}) {
225 0           $did_print{$procref->{name}}{level} = $level;
226 0           $did_print{$procref->{name}}{line} = $line;
227 0           $another_loop = 1;
228 0           $lines[$line][0] = $procref->_write_tree_line($level,0);
229 0           $lines[$line+1][0] = $procref->_write_tree_line($level,1);
230 0           foreach my $ra (values %{$procref->{_after_parents}}) {
  0            
231             $lines[$line][$did_print{$ra->{name}}{line}]
232 0           = $procref->{_after_parents_op}{$ra->{name}};
233             }
234 0           $line+=2;
235 0 0         if ($Debug) {
236 0           $lines[$line++][0] = $procref->_write_tree_line($level,2);
237 0           $lines[$line++][0] = $procref->_write_tree_line($level,3);
238 0           $lines[$line++][0] = $procref->_write_tree_line($level,4);
239             }
240 0           $line++;
241             }
242             }
243             }
244 0           $line++;
245              
246 0           if (0) {
247             for (my $row=1; $row<$line; $row++) {
248             for (my $col=1; $col<$line; $col++) {
249             print ($lines[$row][$col]?1:0);
250             }
251             print "\n";
252             }
253             }
254              
255 0           for (my $col=1; $col<=$#lines; $col++) {
256 0           my $col_used_row_min;
257             my $col_used_row_max;
258 0           for (my $row=1; $row<=$#lines; $row++) {
259 0 0         if ($lines[$row][$col]) {
260 0           $col_used_row_min = min($col_used_row_min, $row);
261 0           $col_used_row_max = max($col_used_row_max, $row);
262             }
263             }
264 0 0         if ($col_used_row_min) {
265 0           $col_used_row_min = min($col_used_row_min, $col);
266 0           $col_used_row_max = max($col_used_row_max, $col);
267 0           for (my $row=$col_used_row_min; $row<=$col_used_row_max; $row++) {
268 0 0 0       $lines[$row][$col] ||= '<' if $row==$col;
269 0   0       $lines[$row][$col] ||= '|';
270             }
271 0           for (my $row=1; $row<=$#lines; $row++) {
272 0 0 0       if (($lines[$row][0]||" ") !~ /^ /) { # Line with text on it
273 0   0       $lines[$row][$col] ||= '-';
274             #$lines[$row][$col-1] ||= '-';
275             }
276              
277 0   0       $lines[$row][$col] ||= ' ';
278             #$lines[$row][$col-1] ||= ' ';
279             }
280             }
281             }
282              
283 0 0         my $fh = IO::File->new($params{filename},"w") or die "%Error: $! $params{filename},";
284 0           print $fh "Tree of process spawn requirements:\n";
285 0           print $fh " & Indicates the program it connects to must complete with ok status\n";
286 0           print $fh " before the command on this row is allowed to become RUNABLE\n";
287 0           print $fh " E As with &, but with error status\n";
288 0           print $fh " ^ As with &, but with error or ok status\n";
289 0           print $fh " O Ored condition, either completing starts proc\n";
290 0           print $fh "\n";
291 0           for (my $row=1; $row<=$#lines; $row++) {
292 0           my $line = "";
293 0           for (my $col=1; $col<$#lines; $col++) {
294 0   0       $line .= ($lines[$row][$col]||"");
295             }
296 0   0       $line .= $lines[$row][0]||"";
297 0           $line =~ s/\s+$//;
298 0           print $fh "$line\n"; #if $line !~ /^\s*$/;
299             }
300              
301 0           $fh->close;
302             }
303              
304             sub min {
305 0     0 0   my $rtn = shift;
306 0           foreach my $v (@_) {
307 0 0 0       $rtn = $v if !defined $rtn || (defined $v && $v < $rtn);
      0        
308             }
309 0           return $rtn;
310             }
311             sub max {
312 0     0 0   my $rtn = shift;
313 0           foreach my $v (@_) {
314 0 0 0       $rtn = $v if !defined $rtn || (defined $v && $v > $rtn);
      0        
315             }
316 0           return $rtn;
317             }
318              
319             1;
320             ######################################################################
321             =pod
322              
323             =head1 NAME
324              
325             Parallel::Forker - Parallel job forking and management
326              
327             =head1 SYNOPSIS
328              
329             use Parallel::Forker;
330             $Fork = new Parallel::Forker (use_sig_child=>1);
331             $SIG{CHLD} = sub { Parallel::Forker::sig_child($Fork); };
332             $SIG{TERM} = sub { $Fork->kill_tree_all('TERM') if $Fork && $Fork->in_parent; die "Quitting...\n"; };
333              
334             $Fork->schedule
335             (run_on_start => sub {print "child work here...";},
336             # run_on_start => \&child_subroutine, # Alternative: call a named sub.
337             run_on_finish => sub {print "parent cleanup here...";},
338             )->run;
339              
340             $Fork->wait_all; # Wait for all children to finish
341              
342             # More processes
343             my $p1 = $Fork->schedule(...)->ready;
344             my $p2 = $Fork->schedule(..., run_after=>[$p1])->ready;
345             $Fork->wait_all; # p1 will complete before p2 starts
346              
347             # Other functions
348             $Fork->poll; # Service any active children
349             foreach my $proc ($Fork->running) { # Loop on each running child
350              
351             while ($Fork->is_any_left) {
352             $Fork->poll;
353             usleep(10*1000);
354             }
355              
356             =head1 DESCRIPTION
357              
358             Parallel::Forker manages parallel processes that are either subroutines or
359             system commands. Forker supports most of the features in all the other
360             little packages out there, with the addition of being able to specify
361             complicated expressions to determine which processes run after others, or
362             run when others fail.
363              
364             Function names are loosely based on Parallel::ForkManager.
365              
366             The unique property of Parallel::Forker is the ability to schedule
367             processes based on expressions that are specified when the processes are
368             defined. For example:
369              
370             my $p1 = $Fork->schedule(..., label=>'p1');
371             my $p2 = $Fork->schedule(..., label=>'p2');
372             my $p3 = $Fork->schedule(..., run_after => ["p1 | p2"]);
373             my $p4 = $Fork->schedule(..., run_after => ["p1 & !p2"]);
374              
375             Process p3 is specified to run after process p1 *or* p2 have completed
376             successfully. Process p4 will run after p1 finishes successfully, and
377             process p2 has completed with bad exit status.
378              
379             For more examples, see the tests.
380              
381             =head1 METHODS
382              
383             =over 4
384              
385             =item $self->find_proc_name()
386              
387             Returns one or more Parallel::Forker::Process objects for the given name (one
388             object returned) or label (one or more objects returned). Returns undef if no
389             processes are found.
390              
391             =item $self->in_parent
392              
393             Return true if and only if called from the parent process (the one that
394             created the Forker object).
395              
396             =item $self->is_any_left
397              
398             Return true if any processes are running, or runnable (need to run).
399              
400             =item $self->kill_all()
401              
402             Send a signal to all running children. You probably want to call this only
403             from the parent process that created the Parallel::Forker object, wrap the
404             call in "if ($self->in_parent)."
405              
406             =item $self->kill_tree_all()
407              
408             Send a signal to all running children and their subchildren.
409              
410             =item $self->poll_interval()
411              
412             Set the time in microseconds between polls when using wait_all. Default is
413             100000 usec (10 microseconds), smaller numbers may improve performance when
414             jobs complete quickly.
415              
416             =item $self->max_proc()
417              
418             Specify the maximum number of processes that the poll method will run at
419             any one time. Defaults to undef, which runs all possible jobs at once.
420             Max_proc takes effect when you schedule processes and mark them "ready,"
421             then rely on Parallel::Forker's poll method to move the processes from the
422             ready state to the run state. (You should not call ->run yourself, as this
423             starts a new process immediately, ignoring max_proc.)
424              
425             =item $self->new()
426              
427             Create a new manager object. There may be more than one manager in any
428             application, but applications taking advantage of the sig_child handler
429             should call every manager's C method in the application's
430             C handler.
431              
432             Parameters are passed by name as follows:
433              
434             =over 4
435              
436             =item max_proc => ()
437              
438             See the C object method.
439              
440             =item use_sig_child => ( 0 | 1 )
441              
442             See the C object method. This option must be specified to
443             prevent a warning.
444              
445             =back
446              
447             =item $self->poll
448              
449             See if any children need work, and service them. Start up to max_proc
450             processes that are "ready" by calling their run method. Non-blocking;
451             always returns immediately.
452              
453             =item $self->process()
454              
455             Return Parallel::Forker::Process object for the specified process name, or
456             undef if none is found. See also find_proc_name.
457              
458             =item $self->processes
459              
460             Return Parallel::Forker::Process objects for all processes.
461              
462             =item $self->processes_sorted
463              
464             Return Parallel::Forker::Process objects for all processes, sorted by name.
465              
466             =item $self->ready_all
467              
468             Mark all processes as ready for scheduling.
469              
470             =item $self->reap_processes
471              
472             Reap all processes which have no other processes waiting for them, and the
473             process is is_done or is_parerr. Returns list of processes reaped. This
474             reclaims memory for when a large number of processes are being created,
475             run, and destroyed.
476              
477             =item $self->running
478              
479             Return Parallel::Forker::Process objects for all processes that are
480             currently running.
481              
482             =item $self->schedule()
483              
484             Register a new process perhaps for later running. Returns a
485             Parallel::Forker::Process object. Parameters are passed by name as
486             follows:
487              
488             =over 4
489              
490             =item label
491              
492             Optional name to use in C commands. Unlike C, this may be
493             reused, in which case C will wait on all commands with the given
494             label. Labels must contain only [a-zA-Z0-9_].
495              
496             =item name
497              
498             Optional name to use in C commands. Note that names MUST be
499             unique! When not specified, a unique number will be assigned
500             automatically.
501              
502             =item run_on_start
503              
504             Subroutine reference to execute when the job begins, in the forked process.
505             The subroutine is called with one argument, a reference to the
506             Parallel::Forker::Process that is starting.
507              
508             If your callback is going to fork, you'd be advised to have the child:
509              
510             $SIG{ALRM} = 'DEFAULT';
511             $SIG{CHLD} = 'DEFAULT';
512              
513             This will prevent the child from inheriting the parent's handlers, and
514             possibly confusing any child calls to waitpid.
515              
516             =item run_on_finish
517              
518             Subroutine reference to execute when the job ends, in the master process.
519             The subroutine is called with two arguments, a reference to the
520             Parallel::Forker::Process that is finishing, and the exit status of the
521             child process. Note the exit status will only be correct if a CHLD signal
522             handler is installed.
523              
524             =item run_pre_start
525              
526             Subroutine reference to execute before forking the child, in the master
527             process. The subroutine is called with one argument, a reference to the
528             Parallel::Forker::Process that is starting.
529              
530             =item run_after
531              
532             A list reference of processes that must be completed before this process
533             can be runnable. You may pass a process object (from schedule), a process
534             name, or a process label. You may use "|" or "&" in a string to run this
535             process after ANY processes exit, or after ALL exit (the default.)
536             ! in front of a process name indicates to run if that process fails with
537             bad exit status. ^ in front of a process indicates to run if that process
538             succeeds OR fails.
539              
540             =back
541              
542             =item $self->sig_child
543              
544             Must be called in a C<$SIG{CHLD}> handler by the parent process if
545             C was called with a "true" value. If there are multiple
546             Parallel::Forker objects each of their C methods must be called
547             in the C<$SIG{CHLD}> handler.
548              
549             =item $self->state_stats
550              
551             Return hash containing statistics with keys of state names, and values with
552             number of processes in each state.
553              
554             =item $self->use_sig_child( 0 | 1 )
555              
556             This should always be called with a 0 or 1. If you install a C<$SIG{CHLD}>
557             handler which calls your Parallel::Forker object's C method, you
558             should also turn on C, by calling it with a "true" argument.
559             Then, calls to C will do less work when there are no children
560             processes to be reaped. If not using the handler call with 0 to prevent a
561             warning.
562              
563             =item $self->wait_all
564              
565             Wait until there are no running or runable jobs left.
566              
567             =item $self->write_tree(filename => )
568              
569             Print a dump of the execution tree.
570              
571             =back
572              
573             =head1 DISTRIBUTION
574              
575             The latest version is available from CPAN and from
576             L.
577              
578             Copyright 2002-2020 by Wilson Snyder. This package is free software; you
579             can redistribute it and/or modify it under the terms of either the GNU
580             Lesser General Public License Version 3 or the Perl Artistic License
581             Version 2.0.
582              
583             =head1 AUTHORS
584              
585             Wilson Snyder
586              
587             =head1 SEE ALSO
588              
589             L
590              
591             =cut
592             ######################################################################