File Coverage

blib/lib/Parallel/Forker.pm
Criterion Covered Total %
statement 78 180 43.3
branch 21 54 38.8
condition 9 47 19.1
subroutine 20 29 68.9
pod 20 23 86.9
total 148 333 44.4


line stmt bran cond sub pod time code
1             # See copyright, etc in below POD section.
2             ######################################################################
3              
4             package Parallel::Forker;
5             require 5.006;
6 34     34   2857859 use Carp qw(carp croak confess);
  34         367  
  34         1859  
7 34     34   189 use IO::File;
  34         50  
  34         4085  
8 34     34   12098 use Time::HiRes qw(usleep);
  34         27418  
  34         134  
9              
10 34     34   21727 use Parallel::Forker::Process;
  34         69  
  34         1306  
11 34     34   175 use strict;
  34         45  
  34         831  
12 34     34   157 use vars qw($Debug $VERSION);
  34         77  
  34         69116  
13              
14             $VERSION = '1.258';
15              
16             ######################################################################
17             #### CONSTRUCTOR
18              
19             sub new {
20 53     53 1 30208 my $class = shift;
21 53         1480 my $self = {
22             _activity => 1, # Optionally set true when a sig_child comes
23             _processes => {}, # All process objects, keyed by id
24             _labels => {}, # List of process objects, keyed by label
25             _runable => {}, # Process objects runable now, keyed by id
26             _running => {}, # Process objects running now, keyed *PID*
27             _run_after_eqn => undef,# Equation to eval to determine if ready to launch
28             _parent_pid => $$, # PID of initial process creating the forker
29             max_proc => undef, # Number processes to launch, <1=any, +=that number
30             use_sig_child => undef, # Default to not using SIGCHLD handler
31             @_
32             };
33 53   33     820 bless $self, ref($class)||$class;
34 53         232 return $self;
35             }
36              
37             #### ACCESSORS
38              
39             sub in_parent {
40 1     1 1 274 my $self = shift;
41 1         7 return $self->{_parent_pid}==$$;
42             }
43              
44             sub max_proc {
45 9     9 1 36 my $self = shift;
46 9 50       54 $self->{max_proc} = shift if $#_>=0;
47 9         18 return $self->{max_proc};
48             }
49              
50             sub use_sig_child {
51 876     876 1 1936 my $self = shift;
52 876 100       3323 $self->{use_sig_child} = shift if $#_>=0;
53 876         7505 return $self->{use_sig_child};
54             }
55              
56             sub running {
57 24     24 1 193 my $self = shift;
58 24         43 return (values %{$self->{_running}});
  24         352  
59             }
60              
61             sub running_sorted {
62 0     0 0 0 my $self = shift;
63 0         0 return (sort {$a->{name} cmp $b->{name}} values %{$self->{_running}});
  0         0  
  0         0  
64             }
65              
66             sub process {
67 0     0 1 0 my $self = shift;
68 0 0       0 confess "usage: \$fork->process(\$name)" unless scalar(@_) == 1;
69 0         0 return $self->{_processes}{$_[0]};
70             }
71              
72             sub processes {
73 52     52 1 136 my $self = shift;
74 52         119 return (values %{$self->{_processes}});
  52         555  
75             }
76              
77             sub processes_sorted {
78 42     42 1 50505 my $self = shift;
79 42         245 return (sort {$a->{name} cmp $b->{name}} values %{$self->{_processes}});
  770         2247  
  42         1050  
80             }
81              
82             sub state_stats {
83 0     0 1 0 my $self = shift;
84 0         0 my %stats = (idle=>0, ready=>0, running=>0, runable=>0,
85             done=>0, parerr=>0, reapable=>0);
86 0         0 map {$stats{$_->state}++} $self->processes;
  0         0  
87 0         0 return %stats;
88             }
89              
90             #### METHODS
91              
92             sub schedule {
93 414     414 1 29979 my $class = shift;
94 414         1475 return Parallel::Forker::Process->_new(_forkref=>$class,
95             @_);
96             }
97              
98             sub sig_child {
99             # Keep minimal to avoid coredumps
100 167 50   167 1 4137 return if !$_[0];
101 167         4232 $_[0]->{_activity} = 1;
102             }
103              
104             sub wait_all {
105 52     52 1 351 my $self = shift;
106 52         147 while ($self->is_any_left) {
107             #print "NRUNNING ", scalar ( (keys %{$self->{_running}}) ), "\n";
108 655         4454 $self->poll;
109 627         53429795 usleep 100*1000;
110             };
111             }
112              
113             sub reap_processes {
114 0     0 1 0 my $self = shift;
115              
116 0         0 my @reaped;
117 0         0 foreach my $process ($self->processes) {
118 0 0       0 next unless $process->is_reapable;
119 0         0 $process->reap;
120 0         0 push @reaped, $process;
121             }
122 0         0 return @reaped;
123             }
124              
125             sub is_any_left {
126 679     679 1 6437 my $self = shift;
127 679 100       1425 return 1 if ( (keys %{$self->{_runable}}) > 0 );
  679         7634  
128 566 100       3195 return 1 if ( (keys %{$self->{_running}}) > 0 );
  566         5932  
129             }
130              
131             sub find_proc_name {
132 426     426 1 715 my $self = shift;
133 426         774 my $name = shift;
134             # Returns list of processes matching the name or label
135 426 100       1192 if (exists $self->{_processes}{$name}) {
    100          
136 330         1168 return ($self->{_processes}{$name});
137             } elsif (exists $self->{_labels}{$name}) {
138 64         138 return @{$self->{_labels}{$name}};
  64         276  
139             }
140 32         300 return undef;
141             }
142              
143             our $_Warned_Use_Sig_Child;
144              
145             sub poll {
146 655     655 1 1444 my $self = shift;
147 655 100 100     3674 return if $self->use_sig_child && !$self->{_activity};
148 220 50       779 if (!defined $self->use_sig_child) {
149 0 0 0     0 carp "%Warning: Forker object should be new'ed with use_sig_child=>0 or 1, "
150             if ($^W && !$_Warned_Use_Sig_Child);
151 0         0 $_Warned_Use_Sig_Child = 1;
152 0         0 $self->use_sig_child(0);
153             }
154              
155             # We don't have a loop around this any more, as we want to allow
156             # applications to do other work. We'd also need to be careful not to
157             # set _activity with no one runnable, as it would potentially cause a
158             # infinite loop.
159              
160 220         604 $self->{_activity} = 0;
161 220         705 my $nrunning = grep { not $_->poll } (values %{$self->{_running}});
  313         3008  
  220         2180  
162              
163 220 50 66     1269 if (!($self->{max_proc} && $nrunning >= $self->{max_proc})) {
164 220         1413 foreach my $procref (sort {$a->{name} cmp $b->{name}} # Lanch in named order
  289         946  
165 220         2139 values %{$self->{_runable}}) {
166 239 100 100     2178 last if ($self->{max_proc} && $nrunning >= $self->{max_proc});
167 223         1929 $procref->run;
168 195         4199 $nrunning++;
169             }
170             }
171             # If no one's running, we need _activity set to check for runable -> running
172             # transitions during the next call to poll().
173 192 100       3789 $self->{_activity} = 1 if !$nrunning;
174             }
175              
176             sub ready_all {
177 52     52 1 575 my $self = shift;
178 52         205 foreach my $procref ($self->processes) {
179 414 50       1532 $procref->ready if $procref->is_idle;
180             };
181             }
182              
183             sub kill_all {
184 0     0 1   my $self = shift;
185 0   0       my $signal = shift || 9;
186 0           foreach my $procref ($self->running_sorted) {
187 0           $procref->kill($signal);
188             };
189             }
190              
191             sub kill_tree_all {
192 0     0 1   my $self = shift;
193 0   0       my $signal = shift || 9;
194 0           foreach my $procref ($self->running_sorted) {
195 0           $procref->kill_tree($signal);
196             };
197             }
198              
199             sub write_tree {
200 0     0 1   my $self = shift;
201 0           my %params = (@_);
202 0 0         defined $params{filename} or croak "%Error: filename not specified,";
203              
204 0           my %did_print;
205 0           my $another_loop = 1;
206 0           my $level = 0;
207 0           my $line = 4;
208 0           my @lines;
209 0           while ($another_loop) {
210 0           $another_loop = 0;
211 0           $level++;
212             proc:
213 0           foreach my $procref ($self->processes_sorted) {
214 0           foreach my $ra (values %{$procref->{_after_parents}}) {
  0            
215 0 0 0       next proc if (($did_print{$ra->{name}}{level}||999) >= $level);
216             }
217 0 0         if (!$did_print{$procref->{name}}{level}) {
218 0           $did_print{$procref->{name}}{level} = $level;
219 0           $did_print{$procref->{name}}{line} = $line;
220 0           $another_loop = 1;
221 0           $lines[$line][0] = $procref->_write_tree_line($level,0);
222 0           $lines[$line+1][0] = $procref->_write_tree_line($level,1);
223 0           foreach my $ra (values %{$procref->{_after_parents}}) {
  0            
224             $lines[$line][$did_print{$ra->{name}}{line}]
225 0           = $procref->{_after_parents_op}{$ra->{name}};
226             }
227 0           $line+=2;
228 0 0         if ($Debug) {
229 0           $lines[$line++][0] = $procref->_write_tree_line($level,2);
230 0           $lines[$line++][0] = $procref->_write_tree_line($level,3);
231 0           $lines[$line++][0] = $procref->_write_tree_line($level,4);
232             }
233 0           $line++;
234             }
235             }
236             }
237 0           $line++;
238              
239 0           if (0) {
240             for (my $row=1; $row<$line; $row++) {
241             for (my $col=1; $col<$line; $col++) {
242             print ($lines[$row][$col]?1:0);
243             }
244             print "\n";
245             }
246             }
247              
248 0           for (my $col=1; $col<=$#lines; $col++) {
249 0           my $col_used_row_min;
250             my $col_used_row_max;
251 0           for (my $row=1; $row<=$#lines; $row++) {
252 0 0         if ($lines[$row][$col]) {
253 0           $col_used_row_min = min($col_used_row_min, $row);
254 0           $col_used_row_max = max($col_used_row_max, $row);
255             }
256             }
257 0 0         if ($col_used_row_min) {
258 0           $col_used_row_min = min($col_used_row_min, $col);
259 0           $col_used_row_max = max($col_used_row_max, $col);
260 0           for (my $row=$col_used_row_min; $row<=$col_used_row_max; $row++) {
261 0 0 0       $lines[$row][$col] ||= '<' if $row==$col;
262 0   0       $lines[$row][$col] ||= '|';
263             }
264 0           for (my $row=1; $row<=$#lines; $row++) {
265 0 0 0       if (($lines[$row][0]||" ") !~ /^ /) { # Line with text on it
266 0   0       $lines[$row][$col] ||= '-';
267             #$lines[$row][$col-1] ||= '-';
268             }
269              
270 0   0       $lines[$row][$col] ||= ' ';
271             #$lines[$row][$col-1] ||= ' ';
272             }
273             }
274             }
275              
276 0 0         my $fh = IO::File->new($params{filename},"w") or die "%Error: $! $params{filename},";
277 0           print $fh "Tree of process spawn requirements:\n";
278 0           print $fh " & Indicates the program it connects to must complete with ok status\n";
279 0           print $fh " before the command on this row is allowed to become RUNABLE\n";
280 0           print $fh " E As with &, but with error status\n";
281 0           print $fh " ^ As with &, but with error or ok status\n";
282 0           print $fh " O Ored condition, either completing starts proc\n";
283 0           print $fh "\n";
284 0           for (my $row=1; $row<=$#lines; $row++) {
285 0           my $line = "";
286 0           for (my $col=1; $col<$#lines; $col++) {
287 0   0       $line .= ($lines[$row][$col]||"");
288             }
289 0   0       $line .= $lines[$row][0]||"";
290 0           $line =~ s/\s+$//;
291 0           print $fh "$line\n"; #if $line !~ /^\s*$/;
292             }
293              
294 0           $fh->close;
295             }
296              
297             sub min {
298 0     0 0   my $rtn = shift;
299 0           foreach my $v (@_) {
300 0 0 0       $rtn = $v if !defined $rtn || (defined $v && $v < $rtn);
      0        
301             }
302 0           return $rtn;
303             }
304             sub max {
305 0     0 0   my $rtn = shift;
306 0           foreach my $v (@_) {
307 0 0 0       $rtn = $v if !defined $rtn || (defined $v && $v > $rtn);
      0        
308             }
309 0           return $rtn;
310             }
311              
312             1;
313             ######################################################################
314             =pod
315              
316             =head1 NAME
317              
318             Parallel::Forker - Parallel job forking and management
319              
320             =head1 SYNOPSIS
321              
322             use Parallel::Forker;
323             $Fork = new Parallel::Forker (use_sig_child=>1);
324             $SIG{CHLD} = sub { Parallel::Forker::sig_child($Fork); };
325             $SIG{TERM} = sub { $Fork->kill_tree_all('TERM') if $Fork && $Fork->in_parent; die "Quitting...\n"; };
326              
327             $Fork->schedule
328             (run_on_start => sub {print "child work here...";},
329             # run_on_start => \&child_subroutine, # Alternative: call a named sub.
330             run_on_finish => sub {print "parent cleanup here...";},
331             )->run;
332              
333             $Fork->wait_all; # Wait for all children to finish
334              
335             # More processes
336             my $p1 = $Fork->schedule(...)->ready;
337             my $p2 = $Fork->schedule(..., run_after=>[$p1])->ready;
338             $Fork->wait_all; # p1 will complete before p2 starts
339              
340             # Other functions
341             $Fork->poll; # Service any active children
342             foreach my $proc ($Fork->running) { # Loop on each running child
343              
344             while ($Fork->is_any_left) {
345             $Fork->poll;
346             usleep(10*1000);
347             }
348              
349             =head1 DESCRIPTION
350              
351             Parallel::Forker manages parallel processes that are either subroutines or
352             system commands. Forker supports most of the features in all the other
353             little packages out there, with the addition of being able to specify
354             complicated expressions to determine which processes run after others, or
355             run when others fail.
356              
357             Function names are loosely based on Parallel::ForkManager.
358              
359             The unique property of Parallel::Forker is the ability to schedule
360             processes based on expressions that are specified when the processes are
361             defined. For example:
362              
363             my $p1 = $Fork->schedule(..., label=>'p1');
364             my $p2 = $Fork->schedule(..., label=>'p2');
365             my $p3 = $Fork->schedule(..., run_after => ["p1 | p2"]);
366             my $p4 = $Fork->schedule(..., run_after => ["p1 & !p2"]);
367              
368             Process p3 is specified to run after process p1 *or* p2 have completed
369             successfully. Process p4 will run after p1 finishes successfully, and
370             process p2 has completed with bad exit status.
371              
372             For more examples, see the tests.
373              
374             =head1 METHODS
375              
376             =over 4
377              
378             =item $self->find_proc_name()
379              
380             Returns one or more Parallel::Forker::Process objects for the given name (one
381             object returned) or label (one or more objects returned). Returns undef if no
382             processes are found.
383              
384             =item $self->in_parent
385              
386             Return true if and only if called from the parent process (the one that
387             created the Forker object).
388              
389             =item $self->is_any_left
390              
391             Return true if any processes are running, or runnable (need to run).
392              
393             =item $self->kill_all()
394              
395             Send a signal to all running children. You probably want to call this only
396             from the parent process that created the Parallel::Forker object, wrap the
397             call in "if ($self->in_parent)."
398              
399             =item $self->kill_tree_all()
400              
401             Send a signal to all running children and their subchildren.
402              
403             =item $self->max_proc()
404              
405             Specify the maximum number of processes that the poll method will run at
406             any one time. Defaults to undef, which runs all possible jobs at once.
407             Max_proc takes effect when you schedule processes and mark them "ready,"
408             then rely on Parallel::Forker's poll method to move the processes from the
409             ready state to the run state. (You should not call ->run yourself, as this
410             starts a new process immediately, ignoring max_proc.)
411              
412             =item $self->new()
413              
414             Create a new manager object. There may be more than one manager in any
415             application, but applications taking advantage of the sig_child handler
416             should call every manager's C method in the application's
417             C handler.
418              
419             Parameters are passed by name as follows:
420              
421             =over 4
422              
423             =item max_proc => ()
424              
425             See the C object method.
426              
427             =item use_sig_child => ( 0 | 1 )
428              
429             See the C object method. This option must be specified to
430             prevent a warning.
431              
432             =back
433              
434             =item $self->poll
435              
436             See if any children need work, and service them. Start up to max_proc
437             processes that are "ready" by calling their run method. Non-blocking;
438             always returns immediately.
439              
440             =item $self->process()
441              
442             Return Parallel::Forker::Process object for the specified process name, or
443             undef if none is found. See also find_proc_name.
444              
445             =item $self->processes
446              
447             Return Parallel::Forker::Process objects for all processes.
448              
449             =item $self->processes_sorted
450              
451             Return Parallel::Forker::Process objects for all processes, sorted by name.
452              
453             =item $self->ready_all
454              
455             Mark all processes as ready for scheduling.
456              
457             =item $self->reap_processes
458              
459             Reap all processes which have no other processes waiting for them, and the
460             process is is_done or is_parerr. Returns list of processes reaped. This
461             reclaims memory for when a large number of processes are being created,
462             run, and destroyed.
463              
464             =item $self->running
465              
466             Return Parallel::Forker::Process objects for all processes that are
467             currently running.
468              
469             =item $self->schedule()
470              
471             Register a new process perhaps for later running. Returns a
472             Parallel::Forker::Process object. Parameters are passed by name as
473             follows:
474              
475             =over 4
476              
477             =item label
478              
479             Optional name to use in C commands. Unlike C, this may be
480             reused, in which case C will wait on all commands with the given
481             label. Labels must contain only [a-zA-Z0-9_].
482              
483             =item name
484              
485             Optional name to use in C commands. Note that names MUST be
486             unique! When not specified, a unique number will be assigned
487             automatically.
488              
489             =item run_on_start
490              
491             Subroutine reference to execute when the job begins, in the forked process.
492             The subroutine is called with one argument, a reference to the
493             Parallel::Forker::Process that is starting.
494              
495             If your callback is going to fork, you'd be advised to have the child:
496              
497             $SIG{ALRM} = 'DEFAULT';
498             $SIG{CHLD} = 'DEFAULT';
499              
500             This will prevent the child from inheriting the parent's handlers, and
501             possibly confusing any child calls to waitpid.
502              
503             =item run_on_finish
504              
505             Subroutine reference to execute when the job ends, in the master process.
506             The subroutine is called with two arguments, a reference to the
507             Parallel::Forker::Process that is finishing, and the exit status of the
508             child process. Note the exit status will only be correct if a CHLD signal
509             handler is installed.
510              
511             =item run_pre_start
512              
513             Subroutine reference to execute before forking the child, in the master
514             process. The subroutine is called with one argument, a reference to the
515             Parallel::Forker::Process that is starting.
516              
517             =item run_after
518              
519             A list reference of processes that must be completed before this process
520             can be runnable. You may pass a process object (from schedule), a process
521             name, or a process label. You may use "|" or "&" in a string to run this
522             process after ANY processes exit, or after ALL exit (the default.)
523             ! in front of a process name indicates to run if that process fails with
524             bad exit status. ^ in front of a process indicates to run if that process
525             succeeds OR fails.
526              
527             =back
528              
529             =item $self->sig_child
530              
531             Must be called in a C<$SIG{CHLD}> handler by the parent process if
532             C was called with a "true" value. If there are multiple
533             Parallel::Forker objects each of their C methods must be called
534             in the C<$SIG{CHLD}> handler.
535              
536             =item $self->state_stats
537              
538             Return hash containing statistics with keys of state names, and values with
539             number of processes in each state.
540              
541             =item $self->use_sig_child( 0 | 1 )
542              
543             This should always be called with a 0 or 1. If you install a C<$SIG{CHLD}>
544             handler which calls your Parallel::Forker object's C method, you
545             should also turn on C, by calling it with a "true" argument.
546             Then, calls to C will do less work when there are no children
547             processes to be reaped. If not using the handler call with 0 to prevent a
548             warning.
549              
550             =item $self->wait_all
551              
552             Wait until there are no running or runable jobs left.
553              
554             =item $self->write_tree(filename => )
555              
556             Print a dump of the execution tree.
557              
558             =back
559              
560             =head1 DISTRIBUTION
561              
562             The latest version is available from CPAN and from
563             L.
564              
565             Copyright 2002-2020 by Wilson Snyder. This package is free software; you
566             can redistribute it and/or modify it under the terms of either the GNU
567             Lesser General Public License Version 3 or the Perl Artistic License
568             Version 2.0.
569              
570             =head1 AUTHORS
571              
572             Wilson Snyder
573              
574             =head1 SEE ALSO
575              
576             L
577              
578             =cut
579             ######################################################################