File Coverage

blib/lib/Parallel/Prefork.pm
Criterion Covered Total %
statement 126 154 81.8
branch 44 66 66.6
condition 10 17 58.8
subroutine 24 26 92.3
pod 5 6 83.3
total 209 269 77.7


line stmt bran cond sub pod time code
1             package Parallel::Prefork;
2              
3 22     22   870241 use strict;
  22         56  
  22         910  
4 22     22   125 use warnings;
  22         43  
  22         738  
5              
6 22     22   703 use 5.008_001;
  22         79  
  22         980  
7              
8 22     22   112 use base qw/Class::Accessor::Lite/;
  22         26  
  22         37576  
9 22     22   28133 use List::Util qw/first max min/;
  22         37  
  22         2819  
10 22     22   35421 use Proc::Wait3 ();
  22         189666  
  22         625  
11 22     22   22865 use Time::HiRes ();
  22         42958  
  22         999  
12              
13             use Class::Accessor::Lite (
14 22         262 rw => [ qw/max_workers spawn_interval err_respawn_interval trap_signals signal_received manager_pid on_child_reap before_fork after_fork/ ],
15 22     22   163 );
  22         56  
16              
17             our $VERSION = '0.17';
18              
19             sub new {
20 21     21 1 13353 my $klass = shift;
21 21 50       137 my $opts = @_ == 1 ? $_[0] : +{ @_ };
22 21         301 my $self = bless {
23             worker_pids => {},
24             max_workers => 10,
25             spawn_interval => 0,
26             err_respawn_interval => 1,
27             trap_signals => {
28             TERM => 'TERM',
29             },
30             signal_received => '',
31             manager_pid => undef,
32             generation => 0,
33             %$opts,
34             _no_adjust_until => 0, # becomes undef in wait_all_children
35             }, $klass;
36             $SIG{$_} = sub {
37 4     4   53 $self->signal_received($_[0]);
38 21         91 } for keys %{$self->trap_signals};
  21         148  
39 21     40   1054 $SIG{CHLD} = sub {};
  40         661  
40 21         91 $self;
41             }
42              
43             sub start {
44 22     22 1 18291 my ($self, $cb) = @_;
45            
46 22         108 $self->manager_pid($$);
47 22         213 $self->signal_received('');
48 22         114 $self->{generation}++;
49            
50 22 50       732 die 'cannot start another process while you are in child process'
51             if $self->{in_child};
52            
53             # main loop
54 22         98 while (! $self->signal_received) {
55 178   66     6561 my $action = $self->{_no_adjust_until} <= Time::HiRes::time()
56             && $self->_decide_action;
57 178 100       1731 if ($action > 0) {
    100          
58             # start a new worker
59 115 100       396 if (my $subref = $self->before_fork) {
60 38         452 $subref->($self);
61             }
62 115         195137 my $pid = fork;
63 115 50       4091 unless (defined $pid) {
64 0         0 warn "fork failed:$!";
65 0         0 $self->_update_spawn_delay($self->err_respawn_interval);
66 0         0 next;
67             }
68 115 100       1142 unless ($pid) {
69             # child process
70 17         1994 $self->{in_child} = 1;
71 17         313 $SIG{$_} = 'DEFAULT' for keys %{$self->trap_signals};
  17         1783  
72 17         4268 $SIG{CHLD} = 'DEFAULT'; # revert to original
73 17 50       860 exit 0 if $self->signal_received;
74 17 100       845 if ($cb) {
75 7         80 $cb->();
76 7         715 $self->finish();
77             }
78 10         987 return;
79             }
80 98 100       8882 if (my $subref = $self->after_fork) {
81 31         1607 $subref->($self, $pid);
82             }
83 98         6158 $self->{worker_pids}{$pid} = $self->{generation};
84 98         2526 $self->_update_spawn_delay($self->spawn_interval);
85             } elsif ($action < 0) {
86             # stop an existing worker
87 5         200 kill(
88             $self->_action_for('TERM')->[0],
89 5         57 (keys %{$self->{worker_pids}})[0],
90             );
91 5         30 $self->_update_spawn_delay($self->spawn_interval);
92             }
93 161 50       795 $self->{__dbg_callback}->()
94             if $self->{__dbg_callback};
95 161 100 66     3539 if (my ($exit_pid, $status)
96             = $self->_wait(! $self->{__dbg_callback} && $action <= 0)) {
97 23         207 $self->_on_child_reap($exit_pid, $status);
98 23 100 100     1082 if (delete($self->{worker_pids}{$exit_pid}) == $self->{generation}
99             && $status != 0) {
100 5         31 $self->_update_spawn_delay($self->err_respawn_interval);
101             }
102             }
103             }
104             # send signals to workers
105 5 50       101 if (my $action = $self->_action_for($self->signal_received)) {
106 5         24 my ($sig, $interval) = @$action;
107 5 50       67 if ($interval) {
108             # fortunately we are the only one using delayed_task, so implement
109             # this setup code idempotent and replace the already-registered
110             # callback (if any)
111 0         0 my @pids = sort keys %{$self->{worker_pids}};
  0         0  
112             $self->{delayed_task} = sub {
113 0     0   0 my $self = shift;
114 0         0 my $pid = shift @pids;
115 0         0 kill $sig, $pid;
116 0 0       0 if (@pids == 0) {
117 0         0 delete $self->{delayed_task};
118 0         0 delete $self->{delayed_task_at};
119             } else {
120 0         0 $self->{delayed_task_at} = Time::HiRes::time() + $interval;
121             }
122 0         0 };
123 0         0 $self->{delayed_task_at} = 0;
124 0         0 $self->{delayed_task}->($self);
125             } else {
126 5         64 $self->signal_all_children($sig);
127             }
128             }
129            
130 5         110 1; # return from parent process
131             }
132              
133             sub finish {
134 7     7 1 322 my ($self, $exit_code) = @_;
135 7 50       70 die "\$parallel_prefork->finish() shouln't be called within the manager process\n"
136             if $self->manager_pid() == $$;
137 7   50     5358 exit($exit_code || 0);
138             }
139              
140             sub signal_all_children {
141 5     5 1 29 my ($self, $sig) = @_;
142 5         12 foreach my $pid (sort keys %{$self->{worker_pids}}) {
  5         94  
143 20         13450 kill $sig, $pid;
144             }
145             }
146              
147             sub num_workers {
148 301     301 0 892 my $self = shift;
149 301         378 return scalar keys %{$self->{worker_pids}};
  301         2765  
150             }
151              
152             sub _decide_action {
153 163     163   230 my $self = shift;
154 163 100       21620 return 1 if $self->num_workers < $self->max_workers;
155 47         728 return 0;
156             }
157              
158             sub _on_child_reap {
159 42     42   291 my ($self, $exit_pid, $status) = @_;
160 42         333 my $cb = $self->on_child_reap;
161 42 100       520 if ($cb) {
162 10         19 eval {
163 10         67 $cb->($self, $exit_pid, $status);
164             };
165             # XXX - hmph, what to do here?
166             }
167             }
168              
169             # runs delayed tasks (if any) and returns how many seconds to wait
170             sub _handle_delayed_task {
171 180     180   395 my $self = shift;
172 180         369 while (1) {
173             return undef
174 180 50       875 unless $self->{delayed_task};
175 0         0 my $timeleft = $self->{delayed_task_at} - Time::HiRes::time();
176 0 0       0 return $timeleft
177             if $timeleft > 0;
178 0         0 $self->{delayed_task}->($self);
179             }
180             }
181              
182             # returns [sig_to_send, interval_bet_procs] or undef for given recved signal
183             sub _action_for {
184 10     10   96 my ($self, $sig) = @_;
185 10 50       112 my $t = $self->{trap_signals}{$sig}
186             or return undef;
187 10 50       111 $t = [$t, 0] unless ref $t;
188 10         51 return $t;
189             }
190              
191             sub wait_all_children {
192 4     4 1 1520 my ($self, $timeout) = @_;
193 4         15 $self->{_no_adjust_until} = undef;
194              
195             my $wait_loop = sub {
196 4     4   21 while (%{$self->{worker_pids}}) {
  23         903  
197 19 50       100 if (my ($pid) = $self->_wait(1)) {
198 19 50       200 if (delete $self->{worker_pids}{$pid}) {
199 19         177 $self->_on_child_reap($pid, $?);
200             }
201             }
202             }
203 4         65 };
204              
205 4 50       32 if ($timeout) {
206 0         0 local $@;
207 0         0 my $is_timeout = 0;
208 0         0 eval {
209             local $SIG{ALRM} = sub {
210 0     0   0 $is_timeout = 1;
211 0         0 die "timeout";
212 0         0 };
213 0         0 alarm($timeout);
214 0         0 $wait_loop->();
215 0         0 alarm(0);
216             };
217 0 0 0     0 die $@
218             if $@ && ! $is_timeout;
219             } else {
220 4         28 $wait_loop->();
221             }
222 4         27 return $self->num_workers;
223             }
224              
225             sub _update_spawn_delay {
226 108     108   3207 my ($self, $secs) = @_;
227 108 50       1635 $self->{_no_adjust_until} = $secs ? Time::HiRes::time() + $secs : 0;
228             }
229              
230             # wrapper function of Proc::Wait3::wait3 that executes delayed task if any. assumes wantarray == 1
231             sub _wait {
232 180     180   572 my ($self, $blocking) = @_;
233 180 100       786 if (! $blocking) {
234 98         1111 $self->_handle_delayed_task();
235 98         1800 return Proc::Wait3::wait3(0);
236             } else {
237 82         664 my $delayed_task_sleep = $self->_handle_delayed_task();
238 82 50 66     374 my $delayed_fork_sleep =
239             $self->_decide_action() > 0 && defined $self->{_no_adjust_until}
240             ? max($self->{_no_adjust_until} - Time::HiRes::time(), 0)
241             : undef;
242 82         1111 my $sleep_secs = min grep { defined $_ } (
  246         1031  
243             $delayed_task_sleep,
244             $delayed_fork_sleep,
245             $self->_max_wait(),
246             );
247 82 100       334 if (defined $sleep_secs) {
248             # wait max sleep_secs or until signalled
249 46         9920370 select(undef, undef, undef, $sleep_secs);
250 46 100       3986 if (my @r = Proc::Wait3::wait3(0)) {
251 10         125 return @r;
252             }
253             } else {
254 36 100       5866522 if (my @r = Proc::Wait3::wait3(1)) {
255 32         441 return @r;
256             }
257             }
258 40         1754 return +();
259             }
260             }
261              
262             sub _max_wait {
263 36     36   484 return undef;
264             }
265              
266             1;
267              
268             __END__