File Coverage

blib/lib/Parallel/ForkControl.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             #
2             # This is a nonsucking forking module.
3             # Coded by:
4             # Brad Lhotsky
5             # Contributions by:
6             # Mark Thomas
7             #
8             package Parallel::ForkControl;
9 2     2   15952 use strict;
  2         6  
  2         87  
10 2     2   11 use warnings;
  2         4  
  2         207  
11              
12 2     2   2904 use POSIX qw/:signal_h :errno_h :sys_wait_h/;
  2         16912  
  2         21  
13 2     2   9757 use Storable qw(freeze thaw);
  2         9685  
  2         200  
14 2     2   4127 use Try::Tiny;
  2         4152  
  2         338  
15 2     2   1761 use CHI;
  0            
  0            
16              
17             our $AUTOLOAD;
18             our $VERSION = 0.5;
19              
20             use constant TRUE => 1;
21             use constant FALSE => 0;
22              
23             use constant DEFAULT => 0;
24             use constant PERMISSION => 1;
25             use constant PERMIT_ALL => 'init/set/get/copy';
26              
27             ##################
28             # Debug constants
29             use constant DB_OFF => 0;
30             use constant DB_INFO => 1;
31             use constant DB_LOW => 2;
32             use constant DB_MED => 3;
33             use constant DB_HIGH => 4;
34              
35             {
36             # private data members
37             my %_attributes = (
38             # Name # defaults # permissions
39             '_name' => [ 'Unnamed Child', PERMIT_ALL ],
40             '_processtimeout' => [ 120, PERMIT_ALL ],
41             '_maxkids' => [ 5, PERMIT_ALL ],
42             '_minkids' => [ 1, PERMIT_ALL ],
43             '_maxload' => [ 4.50, PERMIT_ALL ],
44             '_maxmem' => [ 10.0, PERMIT_ALL ], # non functional
45             '_maxcpu' => [ 25.0, PERMIT_ALL ],
46             '_method' => [ 'cycle', PERMIT_ALL ],
47             '_watchcount' => [ TRUE, PERMIT_ALL ],
48             '_watchload' => [ FALSE, PERMIT_ALL ],
49             '_watchmem' => [ FALSE, PERMIT_ALL ], # non functional
50             '_watchcpu' => [ FALSE, PERMIT_ALL ], # non functional
51             '_parentpid' => [ $$, 'get/set' ],
52             '_code' => [ undef, 'init/get/set' ],
53             '_debug' => [ DB_OFF, 'init/get/set' ],
54             '_check_at' => [ 2, PERMIT_ALL ],
55             '_checked' => [ 0, 'get/init' ],
56             '_accounting' => [ FALSE, PERMIT_ALL ],
57             '_results' => [ undef, 'get' ],
58             '_trackargs' => [ FALSE, PERMIT_ALL ]
59             );
60            
61             my %_KIDS=();
62             my $_KIDS=0;
63             # private member accessors
64             sub _attributes {
65             # return an array of our attributes
66             return keys %_attributes;
67             }
68             sub _default {
69             # return the default for a set attribute
70             my ($self,$attr) = @_;
71             $attr =~ tr/[A-Z]/[a-z]/;
72             $attr =~ s/^\s*_?/_/;
73             return unless exists $_attributes{$attr};
74             return $_attributes{$attr}->[DEFAULT];
75             }
76             sub _can {
77             # return TRUE if we can $perm the $attr
78             my ($self,$perm,$attr) = @_;
79             $attr =~ tr/[A-Z]/[a-z]/;
80             $attr =~ s/^\s*_?/_/;
81             return unless exists $_attributes{$attr};
82             $perm =~ tr/[A-Z]/[a-z/;
83             return TRUE if $_attributes{$attr}->[PERMISSION] =~ /$perm/;
84             return FALSE;
85             }
86             sub _kidstarted {
87             # keep records of our children
88             my ($self,$kid,@args) = @_;
89             $self->_dbmsg(DB_LOW,"CHILD: $kid STARTING");
90             #
91             # use time() here to implement the process time out
92             $_KIDS{$kid} = time;
93              
94             $self->{_results}{$kid} = {
95             status => 'running',
96             signature => undef,
97             result => undef,
98             exitcode => undef,
99             error => undef
100             } if $self->get_accounting();
101              
102             $self->_kid_signature($kid,@args);
103             #
104             # increment the KIDS cntr.
105             $_KIDS++;
106              
107             #
108             # return the pid!
109             return $kid;
110             }
111             sub _kidstopped {
112             # keep track
113             my ($self,$kid,$err) = @_;
114             $self->_dbmsg(DB_HIGH, "KIDSTOPPED: $kid");
115             $self->_dbmsg(DB_HIGH, "KIDS: $_KIDS, (" . join(',',keys %_KIDS) . ")");
116             return unless exists $_KIDS{$kid};
117             if($self->get_accounting()) {
118             $self->{_results}{$kid}{status} = 'terminated';
119             $self->_kid_err($kid,$err) if defined $err;
120             }
121             $self->_dbmsg(DB_LOW,"CHILD: $kid ENDING");
122             delete $_KIDS{$kid};
123             return --$_KIDS;
124             }
125             sub kids {
126             return wantarray() ? keys %_KIDS : $_KIDS;
127             }
128             sub kid_time {
129             my ($self,$kid) = @_;
130             return time unless exists $_KIDS{$kid};
131             return $_KIDS{$kid};
132             }
133             sub _pid {
134             return $$;
135             }
136              
137             #
138             # Child Accounting
139             sub clear_results {
140             my ($self) = @_;
141             return unless $self->get_accounting();
142             delete $self->{_results} if exists $self->{_results};
143             $self->{_chi}->clear();
144             }
145              
146             # Always return copies
147             sub get_results {
148             my ($self,$key) = @_;
149             return unless $self->get_accounting();
150             if( !exists $self->{_results} ) {
151             warn "something successfully did not happen\n";
152             return;
153             }
154             if( defined $key ) {
155             if( $self->{_results}{$key} ) {
156             return {
157             %{ $self->{_results}{$key} },
158             'return' => $self->{_chi}->get($key),
159             };
160             }
161             else {
162             warn "attempt to access undefined child '$key'\n";
163             return;
164             }
165             }
166             else {
167             my %local_copy = %{ $self->{_results} };
168             foreach my $kid ( keys %local_copy ) {
169             $local_copy{$kid}->{return} = $self->{_chi}->get( $kid );
170             }
171             return \%local_copy;
172             }
173             }
174              
175             sub _kid_err {
176             my ($self,$kid,$err) = @_;
177             return unless $self->get_accounting();
178             push @{ $self->{_results}{$kid}{error} }, $err;
179             }
180             sub _kid_exitcode {
181             my ($self,$kid,$ec) = @_;
182             return unless $self->get_accounting();
183             $self->{_results}{$kid}{exitcode} = $ec;
184             }
185             sub _kid_return {
186             my ($self,$kid,$return) = @_;
187             return unless $self->get_accounting();
188             $self->{_chi}->set( $kid, $return, 'never' );
189             }
190             sub _kid_signature {
191             my ($self,$kid,@args) = @_;
192             return unless $self->get_accounting() && $self->get_trackargs();
193             $self->{_results}{$kid}{signature} = freeze \@args;
194             }
195             sub _kid_status {
196             my ($self,$kid) = @_;
197             return unless $self->get_accounting();
198             foreach my $err ( @{ $self->{_results}{$kid}{error} } ) {
199             $self->_dbmsg(DB_HIGH, "ChildError: $kid -> $err");
200             }
201             }
202              
203             }
204              
205             # Class Methods
206             sub DESTROY { }
207              
208             sub new {
209             # Constructor
210             # Builds our initial Fork Object;
211             my ($proto,@args) = @_;
212             my $proto_is_obj = ref $proto;
213             my $class = $proto_is_obj || $proto;
214             my $self = bless {}, $class;
215             # take care of capitalization:
216             my %args=();
217             while(@args) {
218             my $k = shift @args;
219             my $v = shift @args;
220             ($k) = ($k =~ /^\s*_?(.*)$/);
221             $args{lc($k)}=$v;
222             }
223             # now take care of our initialization
224             foreach my $attr ($self->_attributes()) {
225             my ($arg) = ($attr =~ /^_?(.*)/);
226             # first see its in our argument list
227             if(exists $args{$arg} && $self->_can('init',$attr)) {
228             $self->{$attr} = $args{$arg};
229             }
230             # if not, check to see if we're copying an
231             # object. Also, make sure we can copy it!
232             elsif($proto_is_obj && $self->_can('copy', $attr)) {
233             $self->{$attr} = $proto->{$attr};
234             }
235             # or, just use the default!
236             else {
237             $self->{$attr} = $self->_default($attr);
238             }
239             }
240             # set the parent pid
241             $self->set_parentpid($$);
242             $self->_dbmsg(DB_HIGH,'FORK OBJECT CREATED');
243              
244             # Create the Cache
245             $self->{_chi} = CHI->new( driver => 'File', root_dir => '/tmp', namespace => "PFC-$$" );
246             return $self;
247             }
248              
249              
250             sub _overLoad {
251             # this is a cheap linux only hack
252             # I will be replacing this as soon as I have time
253             my $CMDTOCHECK = '/usr/bin/uptime';
254             my ($self) = shift;
255             return FALSE unless $self->get_watchload();
256             open(LOAD, "$CMDTOCHECK |") or return FALSE;
257             local $/ = undef;
258             chomp(local $_ = );
259             close LOAD;
260             if(/load average\:\s+(\d+\.\d+)/m) {
261             my $current = $1;
262             my $MAXLOAD = $self->get_maxload();
263             if ($current >= $MAXLOAD) {
264             $self->_dbmsg(DB_LOW,"OVERLOAD: Current: $current, Max: $MAXLOAD, RETURNING TRUE");
265             return TRUE;
266             }
267             $self->_dbmsg(DB_LOW,"OVERLOAD: Current: $current, Max: $MAXLOAD, RETURNING FALSE");
268             return FALSE;
269             }
270             $self->_dbmsg(DB_LOW,'OVERLOAD: ERROR READING LOAD AVERAGE, RETURNING FALSE');
271             return FALSE;
272             }
273              
274             sub _tooManyKids {
275             # determine if there are too many forks
276             my ($self) = @_;
277             my $kids = $self->kids;
278             my $MAXKIDS = $self->get_maxkids();
279             my $MINKIDS = $self->get_minkids();
280              
281             #
282             # Figure out how to do this check.
283             if($self->get_watchload) {
284             $self->_dbmsg(DB_MED,'TOOMANYKIDS - LOAD CHECKING');
285             if($self->get_watchcount) {
286             if(!$self->_overLoad && ($kids < $MAXKIDS)) {
287             $self->_dbmsg(DB_LOW,"TOOMANYKIDS - MAX: $MAXKIDS, Kids: $kids, Return: FALSE");
288             return FALSE;
289             }
290             $self->_dbmsg(DB_LOW,"TOOMANYKIDS - MAX: $MAXKIDS, Kids: $kids, Return: TRUE");
291             return TRUE;
292             }
293             else {
294             $self->_dbmsg(DB_MED,'TOOMANYKIDS - CHECKING LOAD, NOT CHECKING COUNT');
295             if(!$self->_overLoad) {
296             $self->_dbmsg(DB_LOW,"TOOMANYKIDS - Kids: $kids, UNCHECKED RETURNING FALSE");
297             return FALSE;
298             }
299             if($self->kids < $self->get_minkids) {
300             $self->_dbmsg(DB_LOW, "TOOMANYKIDS - OVERLOAD BUT REACHING MINIMUM KIDS!");
301             return FALSE;
302             }
303             $self->_dbmsg(DB_LOW,"TOOMANYKIDS - Kids: $kids, UNCHECKED RETURNING TRUE");
304             return TRUE;
305             }
306             } # end of watchload
307             else {
308             # not watching the load, stick to the
309             # maxforks attribute
310             $self->_dbmsg(DB_LOW,"TOOMANYKIDS - NOT CHECKING LOAD/MEM/CPU - Kids: $kids MAX: $MAXKIDS");
311             if($kids >= $self->get_maxkids()) {
312             $self->_dbmsg(DB_MED,'TOOMANYKIDS - RETURN TRUE');
313             return TRUE;
314             } else {
315             $self->_dbmsg(DB_MED,'TOOMANYKIDS - RETURN FALSE');
316             return FALSE;
317             }
318             }
319              
320             # if we get to this point something is wrong, return true
321             return TRUE;
322             }
323              
324             sub _check {
325             #
326             # this function is here to make sure we don't
327             # freeze up eventually. It should be all good.
328             my $self = shift;
329             $self->{_checked}++;
330             return if $self->get_check_at > $self->get_checked;
331             foreach my $pid ( $self->kids ) {
332             my $alive = kill 0, $pid;
333             if($alive) {
334             my $start = $self->kid_time($pid);
335             if(time - $start > $self->get_processtimeout()) {
336             $self->_kid_err($pid,'process timeout');
337             kill 15, $pid;
338             $self->_kid_status($pid);
339             }
340             }
341             else {
342             $self->_dbmsg(DB_INFO, "Child ($pid) evaded the reaper. Caught by _check()\n");
343             $self->_kidstopped($pid,'evaded the reaper');
344             }
345             }
346             $self->{_checked} = 0;
347             }
348              
349             sub run {
350             # self and args go in, run the code ref or die if
351             # the code ref isn't set
352             my ($self,@args) = @_;
353              
354             #
355             # Allow a user to pass a CODE Ref as the first argument,
356             # default to legacy CODE Parameter.
357             my $codeRef = shift @args;
358              
359             #
360             # If it's not a code ref, put it back on args and get the code ref.
361             if( ref $codeRef ne 'CODE' ) {
362             unshift @args, $codeRef;
363             $codeRef = $self->get_code();
364             }
365              
366             my $typeCodeRef = ref $codeRef;
367             die "CANNOT RUN A $typeCodeRef IN run()\n" unless $typeCodeRef eq 'CODE';
368              
369             # return if our parent has died
370             unless($self->_parentAlive()) {
371             $self->_dbmsg(DB_MED, 'PARENT IS NOT ALIVE: ' . $self->get_parentpid);
372             return;
373             }
374              
375             # We might call _check();
376             $self->_check();
377              
378             # wait for childern to die if we have too many
379             if($self->get_method =~ /block/) {
380             $self->waitforkids() if $self->_tooManyKids;
381             }
382             elsif($self->_tooManyKids) {
383             $self->_kidstopped(wait);
384             }
385             else {
386             #
387             # Due limitations with the speed of process creation
388             # on various modern OS's, its best to limit the maximum number
389             # of processes created per second to 100
390             select undef, undef, undef, 0.01;
391             }
392              
393             # Protect us from zombies
394             $SIG{CHLD} = sub { $self->_REAPER };
395              
396             # fork();
397             my $pid = fork();
398             # check for errors
399             die "*\n* FORK ERROR !!\n*\n" unless defined $pid;
400              
401             # if we're the parent return
402             if($pid > 0) {
403             return $self->_kidstarted($pid,@args);
404             }
405              
406             # we're the child
407             local $0 = ' Child of ' . $self->get_name;
408             $self->_dbmsg(DB_HIGH,'Running Fork Code');
409             my @trapSignals = qw(INT KILL TERM QUIT HUP ABRT);
410             my @return = ();
411             my $eval_error = undef;
412             try {
413             foreach my $sig (@trapSignals) {
414             $SIG{$sig} = sub { $self->_REAPER; };
415             }
416             @return = $codeRef->(@args);
417             } catch {
418             $eval_error = shift;
419             if($eval_error =~ /timeout/) {
420             $self->_kid_err($$, 'alarmed out');
421             }
422             };
423             $self->_kid_return( $$, scalar @return > 1 ? \@return : shift @return );
424              
425             my $CODE = $eval_error ? 1 : 0;
426             exit $CODE;
427             }
428              
429              
430             sub waitforkids {
431             # We'll just rely on our SIG{'CHLD'} handler to actually
432             # disperse of the children, so all we have to do is wait
433             # here.
434             my $self = shift;
435             # using select here because it doesn't interfere
436             # with any signals in the program
437             while( $self->kids ) {
438             $self->_check;
439             select undef, undef, undef, 1;
440             $self->_REAPER;
441             }
442             return TRUE;
443             }
444             # Provided for legacy support
445             sub cleanup {
446             my $self = shift;
447             return $self->waitforkids;
448             }
449              
450             sub _REAPER {
451             # our SIGCHLD Handler
452             # Code from the Perl Cookbook page 592
453             # - heavily modified
454             my $self = shift;
455              
456             my $pid = wait;
457              
458             if($pid > 0) {
459             # a pid did something,
460             $self->_dbmsg(DB_HIGH,"_REAPER found a child ($pid)!!!!!");
461             my $rc = undef;
462             if(!WIFEXITED($?)) {
463             $rc=1;
464             $self->_dbmsg(DB_INFO, "Child ($pid) exitted abnormally");
465             $self->_kid_err($pid,'abnormal process termination');
466             }
467             elsif( WIFSIGNALED($?) ) {
468             $self->_kid_err($pid,"Uncaught signal: " . WTERMSIG($?));
469             }
470             if(not defined $rc) {
471             $rc = WEXITSTATUS($?);
472             }
473             $self->_kid_exitcode($pid,$rc);
474             $self->_kidstopped($pid);
475             }
476             $SIG{CHLD} = sub {$self->_REAPER};
477             }
478              
479             sub _parentAlive {
480             # check to see if the parent is still alive
481             my $self = shift;
482             return kill 0, $self->get_parentpid();
483             }
484              
485             sub AUTOLOAD {
486             # AUTOLOAD our get/set methods
487             no strict 'refs';
488             return if $AUTOLOAD =~ /DESTROY/;
489             my ($self,$arg) = @_;
490              
491             # get routines
492             if($AUTOLOAD =~ /get(_.*)/ && $self->_can('get', $1)) {
493             my $attr = lc($1);
494             *{$AUTOLOAD} = sub { return $_[0]->{$attr}; };
495             return $self->{$attr};
496             }
497              
498             # set routines
499             if($AUTOLOAD =~ /set(_.*)/ && $self->_can('set', $1)){
500             my $attr = lc($1);
501             *{$AUTOLOAD} = sub {
502             my ($self,$val) = @_;
503             $self->{$attr} = $val;
504             return $self->{$attr};
505             };
506             $self->{$attr} = $arg;
507             return $self->{$attr};
508             }
509              
510             warn "AUTOLOAD Could not find method $AUTOLOAD\n";
511             return;
512             }
513              
514             # DEBUG AND TESTING SUBS
515             sub _print_me {
516             my $self = shift;
517             my $class = ref $self;
518             print "$class Object:\n";
519             foreach my $attr ($self->_attributes) {
520             my ($pa) = ($attr =~ /^_(.*)/);
521             $pa = "\L\u$pa";
522             my $val = ref $self->{$attr} || $self->{$attr};
523             print "\t$pa: $val\n";
524             }
525             print "\n";
526             }
527              
528             sub _dbmsg {
529             # print debugging messages:
530             my ($self,$pri,@MSGS) = @_;
531             return unless $self->get_debug() >= $pri;
532             foreach my $msg (@MSGS) {
533             $msg =~ s/[\cM\r\n]+//g;
534             my $date = scalar localtime;
535             print STDERR "$date - $msg\n";
536             }
537             return TRUE;
538             }
539              
540             return 1;
541             1
542             __END__