File Coverage

blib/lib/Parallel/Prefork/SpareWorkers.pm
Criterion Covered Total %
statement 47 51 92.1
branch 8 10 80.0
condition 7 13 53.8
subroutine 13 14 92.8
pod 3 4 75.0
total 78 92 84.7


line stmt bran cond sub pod time code
1             package Parallel::Prefork::SpareWorkers;
2              
3 1     1   809944 use strict;
  1         3  
  1         47  
4 1     1   5 use warnings;
  1         1  
  1         41  
5              
6 1     1   6 use Exporter qw(import);
  1         2  
  1         48  
7              
8 1     1   12555 use List::MoreUtils qw(uniq);
  1         21776  
  1         140  
9              
10 1     1   14 use base qw/Parallel::Prefork/;
  1         3  
  1         773  
11              
12 1     1   7 use constant STATUS_NEXIST => '.';
  1         2  
  1         65  
13 1     1   6 use constant STATUS_IDLE => '_';
  1         2  
  1         644  
14              
15             our %EXPORT_TAGS = (
16             status => [ qw(STATUS_NEXIST STATUS_IDLE) ],
17             );
18             our @EXPORT_OK = uniq sort map { @$_ } values %EXPORT_TAGS;
19             $EXPORT_TAGS{all} = \@EXPORT_OK;
20              
21             __PACKAGE__->mk_accessors(qw/min_spare_workers max_spare_workers scoreboard heartbeat/);
22              
23             sub new {
24 1     1 1 1576 my $klass = shift;
25 1         9 my $self = $klass->SUPER::new(@_);
26 1 50       6 die "mandatory option min_spare_workers not set"
27             unless $self->{min_spare_workers};
28 1   33     4 $self->{max_spare_workers} ||= $self->max_workers;
29 1   50     11 $self->{heartbeat} ||= 0.25;
30 1   33     6 $self->{scoreboard} ||= do {
31 1         1614 require 'Parallel/Prefork/SpareWorkers/Scoreboard.pm';
32 1   50     81 Parallel::Prefork::SpareWorkers::Scoreboard->new(
33             $self->{scoreboard_file} || undef,
34             $self->max_workers,
35             );
36             };
37 1         6 $self;
38             }
39              
40             sub start {
41 1     1 1 13 my $self = shift;
42 1         10 my $ret = $self->SUPER::start();
43 1 50       8 unless ($ret) {
44             # child process
45 0         0 $self->scoreboard->child_start();
46 0         0 return;
47             }
48 1         6 return 1;
49             }
50              
51             sub num_active_workers {
52 101     101 0 259 my $self = shift;
53 1010 100       16737 scalar grep {
54 101         787 $_ ne STATUS_NEXIST && $_ ne STATUS_IDLE
55             } $self->scoreboard->get_statuses;
56             }
57              
58             sub set_status {
59 0     0 1 0 my ($self, $status) = @_;
60 0         0 $self->scoreboard->set_status($status);
61             }
62              
63             sub _decide_action {
64 97     97   458 my $self = shift;
65 97         396 my $spare_workers = $self->num_workers - $self->num_active_workers;
66 97 100 100     833 return 1
67             if $spare_workers < $self->min_spare_workers
68             && $self->num_workers < $self->max_workers;
69 85 100       846 return -1
70             if $spare_workers > $self->max_spare_workers;
71 75         947 return 0;
72             }
73              
74             sub _on_child_reap {
75 10     10   53 my ($self, $exit_pid, $status) = @_;
76 10         78 $self->SUPER::_on_child_reap($exit_pid, $status);
77 10         43 $self->scoreboard->clear_child($exit_pid);
78             }
79              
80             sub _max_wait {
81 46     46   88 my $self = shift;
82 46         168 return $self->{heartbeat};
83             }
84              
85             1;
86             __END__