File Coverage

blib/lib/Parallel/MPM/Prefork.pm
Criterion Covered Total %
statement 136 251 54.1
branch 37 140 26.4
condition 29 94 30.8
subroutine 23 32 71.8
pod 8 8 100.0
total 233 525 44.3


line stmt bran cond sub pod time code
1             package Parallel::MPM::Prefork;
2              
3 4     4   111553 use 5.010;
  4         18  
  4         207  
4 4     4   29 use strict;
  4         7  
  4         166  
5 4     4   25 use warnings;
  4         43  
  4         136  
6 4     4   25 use Exporter 'import';
  4         8  
  4         151  
7 4     4   22 use Fcntl;
  4         18  
  4         1736  
8 4     4   3602 use POSIX qw(:signal_h :sys_wait_h sigprocmask);
  4         29452  
  4         27  
9 4     4   10480 use Socket;
  4         19080  
  4         2978  
10 4     4   3959 use Storable qw(nfreeze thaw);
  4         14024  
  4         288  
11              
12 4     4   3977 use Data::Dumper;
  4         48586  
  4         386  
13              
14             use constant {
15 4         391 MAX_SERVERS => 73,
16             MAX_SPARE => 10,
17             MIN_SPARE => 5,
18             START_SERVERS => 5,
19 4     4   70 };
  4         8  
20              
21 4     4   21 use constant CLD_DATA_HDR_FMT => 'iSCL'; # PID, exit code, thaw, data length
  4         10  
  4         289  
22 4     4   23 use constant CLD_DATA_HDR_LEN => length pack CLD_DATA_HDR_FMT, 0;
  4         9  
  4         15678  
23              
24             our $VERSION = '0.14';
25              
26             our (@EXPORT_OK, @EXPORT_TAGS) = ();
27             our @EXPORT =
28             qw(
29             pf_init
30             pf_done
31             pf_whip_kids
32             pf_kid_new
33             pf_kid_busy
34             pf_kid_yell
35             pf_kid_idle
36             pf_kid_exit
37             );
38              
39             our $error;
40              
41             my $pgid;
42             my $done;
43             my $debug;
44             my $am_parent;
45             my $timeout;
46              
47             my $max_servers;
48             my $max_spare_servers;
49             my $min_spare_servers;
50             my $start_servers;
51              
52             my $parent_stat_fh;
53             my $parent_data_fh;
54              
55             my $child_stat_fh;
56             my $child_stat_fd;
57              
58             my $child_data_fh;
59             my $child_data_fd;
60              
61             my $child_fds;
62              
63             my $child_data_hook;
64             my $child_sigh;
65              
66             my $dhook_in_main;
67             my $dhook_pid;
68              
69             my $num_busy;
70             my $num_idle;
71             my %busy;
72             my %idle;
73              
74             my $sigset_bak = POSIX::SigSet->new();
75             my $sigset_all = POSIX::SigSet->new();
76             $sigset_all->fillset();
77              
78             #
79             # Public interface
80             #
81              
82             sub pf_init {
83 3     3 1 56 my %opts = @_;
84              
85 3         6 eval {
86 3         57 setpgrp();
87 3         23 $pgid = getpgrp();
88              
89 3         11 $timeout = $am_parent = 1;
90 3         11 $dhook_pid = $done = $num_busy = $num_idle = 0;
91 3         11 $child_fds = $child_stat_fd = $child_data_fd = $error = '';
92              
93 3         11 undef %busy;
94 3         8 undef %idle;
95              
96 3         9 $debug = $opts{debug};
97              
98             # Just like Apache, we allow start_servers to be larger than
99             # max_spare_servers to accommodate for high initial load.
100 3   50     24 $max_servers = int($opts{max_servers} // MAX_SERVERS);
101 3   100     16 $max_spare_servers = int($opts{max_spare_servers} // MAX_SPARE);
102 3   100     21 $min_spare_servers = int($opts{min_spare_servers} // MIN_SPARE);
103 3   100     86 $start_servers = int($opts{start_servers} // START_SERVERS);
104              
105 3 50 33     50 if ($max_servers <= 0 || $max_spare_servers <= 0 ||
      33        
      33        
106             $min_spare_servers <= 0 || $start_servers <= 0) {
107 0         0 die "All child server process numbers must be >= 1!";
108             }
109              
110 3 50       12 if ($max_servers < $min_spare_servers) {
111 0         0 $max_servers = $min_spare_servers;
112 0         0 warn "Adjusted max_servers to $max_servers";
113             }
114              
115 3 50       12 if ($max_spare_servers < $min_spare_servers) {
116 0         0 $max_spare_servers = $min_spare_servers;
117 0         0 warn "Adjusted max_spare_servers to $max_spare_servers";
118             }
119              
120 3 50       21 if ($start_servers > $max_servers) {
    50          
121 0         0 $start_servers = $max_servers;
122 0         0 warn "Adjusted start_servers to $start_servers";
123             }
124             elsif ($start_servers < $min_spare_servers) {
125 0         0 $start_servers = $min_spare_servers;
126 0         0 warn "Adjusted start_servers to $start_servers";
127             }
128              
129 3 100       12 if (defined($child_data_hook = $opts{child_data_hook})) {
130 2 50       14 ref $child_data_hook eq 'CODE' or
131             die "child_data_hook must be a code reference";
132             }
133              
134 3         19 $child_sigh = _make_child_sigh($opts{child_sigh});
135 3         11 $dhook_in_main = $opts{data_hook_in_main};
136              
137 3 100       9 if ($child_data_hook) {
138 2         10 _socketpair($parent_data_fh, $child_data_fh);
139 2 50       8 if ($dhook_in_main) {
140 2         12 vec($child_data_fd, fileno $child_data_fh, 1) = 1;
141 2         8 $child_fds |= $child_data_fd;
142             }
143             else {
144 0         0 $dhook_pid = _fork_data_hook_helper($parent_data_fh, $child_data_fh);
145             }
146             }
147              
148 3         9 _socketpair($parent_stat_fh, $child_stat_fh);
149 3         13 vec($child_stat_fd, fileno $child_stat_fh, 1) = 1;
150 3         11 $child_fds |= $child_stat_fd;
151              
152 3         67 $SIG{CHLD} = \&_sig_chld;
153 3         14 _wait_for_children()
154             };
155              
156 3 50       28 if ($@) {
157 0         0 $error = $@;
158 0         0 return undef;
159             }
160              
161 3         19 return $parent_stat_fh;
162             }
163              
164             sub pf_whip_kids ($;$) {
165 0     0 1 0 my $code = shift;
166 0         0 my $args = shift;
167              
168 0 0       0 return 0 if $done;
169              
170 0 0       0 if ($start_servers) {
    0          
    0          
171 0         0 do { _spawn($code, $args) } until ! --$start_servers;
  0         0  
172             }
173             elsif ((my $lack = $min_spare_servers - $num_idle) > 0) {
174 0 0       0 _log_child_action('Forking', $lack) if $debug;
175 0         0 for (1 .. $lack) {
176 0 0 0     0 last if (_spawn($code, $args) // return undef) < 0;
177             }
178             }
179             elsif ((my $plus = $num_idle - $max_spare_servers) > 0) {
180 0 0       0 _log_child_action('Killing', $plus) if $debug;
181 0         0 _kill_idlers($plus);
182             }
183              
184 0 0       0 _log_child_status() if $debug;
185 0         0 _read_child_drool();
186              
187 0         0 return 1;
188             }
189              
190             sub pf_kid_new () {
191 2     2 1 24 while (!$done) {
192 2 50       6 if ($start_servers) {
    0          
    0          
193 2         4 $start_servers--;
194 2         12 return _spawn();
195             }
196             elsif ((my $lack = $min_spare_servers - $num_idle) > 0) {
197 0 0       0 _log_child_action('Lacking', $lack, "Forking 1 child.\n") if $debug;
198 0         0 my $pid = _spawn();
199 0 0 0     0 return $pid if !defined $pid || $pid >= 0;
200             }
201             elsif ((my $plus = $num_idle - $max_spare_servers) > 0) {
202 0 0       0 _log_child_action('Killing', $plus) if $debug;
203 0         0 _kill_idlers($plus);
204             }
205              
206 0 0       0 _log_child_status() if $debug;
207 0         0 _read_child_drool();
208             }
209              
210 0         0 return -1;
211             }
212              
213             sub pf_kid_busy {
214 0 0   0 1 0 syswrite $parent_stat_fh, "R$$\n" if ! $am_parent;
215             }
216              
217             sub pf_kid_idle {
218 0 0   0 1 0 syswrite $parent_stat_fh, "S$$\n" if ! $am_parent;
219             }
220              
221             sub pf_kid_yell($;$$) {
222 1     1 1 3 my ($data, $thaw, $exitcode) = @_;
223              
224 1 50 33     122 return undef if $am_parent || !($parent_data_fh && ref $data);
      33        
225              
226 1   33     20 $data = eval { nfreeze($data) } // do {
  1         38  
227 0         0 warn "ERROR: Could not nfreeze() data from pid $$: ", $@;
228 0         0 $error = $@;
229 0         0 return undef;
230             };
231              
232 1 50 50     354 syswrite $parent_data_fh,
233             pack(CLD_DATA_HDR_FMT, $$, $exitcode // 256, $thaw ? 1 : 0, length $data)
234             . $data;
235             }
236              
237             sub pf_kid_exit(;$$$) {
238 1     1 1 42 my ($exitcode, $data, $thaw) = @_;
239              
240 1 50       29 return if $am_parent;
241              
242 1   50     19 ($exitcode //= 0) &= 0xff;
243              
244 1         14 pf_kid_yell($data, $thaw, $exitcode);
245              
246 1         354 exit $exitcode;
247             }
248              
249             sub pf_done (;$) {
250 2     2 1 22 my $exitcode = shift;
251              
252 2 50 33     70 return if !$am_parent || $done++;
253              
254 2         51 local $SIG{CHLD} = 'IGNORE';
255 2         39 local $SIG{TERM} = 'IGNORE';
256 2         43 kill 'TERM', 0;
257              
258 2         17 my $pid = 0;
259 2         4 my $nbytes;
260              
261 2 50       15 undef $child_fds if ! $child_stat_fh;
262              
263 2   66     5 do {
264 3 100       30 $pid = waitpid -$pgid, WNOHANG if $pid >= 0;
265 3         15 $nbytes = _read_child_data();
266 3 50 33     37 select my $rfds = $child_fds, undef, undef, .1 if !($pid || $nbytes);
267             } while $pid >= 0 || $nbytes;
268              
269 2 50       10 exit $exitcode if defined $exitcode;
270              
271 2         263 undef $_ for ($parent_stat_fh, $child_stat_fh,
272             $parent_data_fh, $child_data_fh);
273             }
274              
275              
276             #
277             # Private parts
278             #
279              
280             sub _make_child_sigh {
281 3   50 3   20 my $child_sigh = shift // return undef;
282              
283 0 0       0 ref $child_sigh eq 'HASH' or die "child_sigh must be a hash reference";
284              
285 0 0       0 if (%$child_sigh) {
286 0         0 my %sig2hnd;
287 0         0 while (my ($sigs, $code) = each %$child_sigh) {
288 0 0 0     0 if (defined $code &&
      0        
289             ref $code ne 'CODE' && $code !~ /^(?:DEFAULT|IGNORE)$/) {
290 0         0 die "child_sigh($sigs) must be a code ref, DEFAULT, IGNORE or undef";
291             }
292 0         0 for (split ' ', $sigs) {
293 0 0       0 $sig2hnd{$_} =
294             exists $SIG{$_} ? $code : die "child_sigh: No such signal: $_";
295             }
296             }
297 0         0 return \%sig2hnd;
298             }
299              
300 0         0 return undef;
301             }
302              
303             sub _socketpair {
304 5 50   5   269 socketpair $_[0], $_[1], AF_UNIX, SOCK_STREAM, PF_UNSPEC
305             or die "ERROR: socketpair(): $!\n";
306 5         39 fcntl $_[1], F_SETFL, fcntl($_[1], F_GETFL, 0) | O_NONBLOCK;
307             }
308              
309             sub _fork_data_hook_helper {
310 0     0   0 my ($parent_data_fh, $child_data_fh) = @_;
311              
312 0         0 sigprocmask(SIG_BLOCK, $sigset_all, $sigset_bak);
313              
314 0   0     0 my $cpid = fork() // die "Could not fork: $!";
315              
316 0 0       0 if ($cpid) {
317 0         0 sigprocmask(SIG_SETMASK, $sigset_bak);
318 0         0 return $cpid;
319             }
320              
321 0         0 undef $parent_data_fh;
322 0         0 $0 .= ' [data_hook_helper]';
323              
324 0         0 $child_fds = '';
325 0         0 vec($child_fds, fileno $child_data_fh, 1) = 1;
326              
327 0         0 while (my ($sig, $hnd) = each %SIG) {
328 0 0 0     0 undef $SIG{$sig} if defined $hnd && $sig ne 'FPE';
329             }
330              
331 0         0 sigprocmask(SIG_SETMASK, $sigset_bak);
332              
333 0         0 while (1) {
334 0         0 select my $rfds = $child_fds, undef, undef, undef;
335 0         0 _read_child_data();
336             }
337             }
338              
339             sub _spawn {
340 2     2   4 my $code = shift;
341 2         4 my $args = shift;
342              
343 2 50       10 if ($num_idle + $num_busy >= $max_servers) {
344 0         0 warn "Server seems busy, consider increasing max_servers.\n";
345 0         0 _log_child_status();
346 0         0 return -1;
347             }
348              
349             # Temporarily block signal delivery until child has installed all handlers
350             # and knows for sure it's not the parent.
351 2         20 sigprocmask(SIG_BLOCK, $sigset_all, $sigset_bak);
352              
353 2         2208 my $cpid = fork();
354              
355 2 100       205 if ($cpid) {
    50          
356             # Parent
357 1         25 $num_idle++;
358 1         52 $idle{$cpid}++;
359             }
360             elsif (defined $cpid) {
361             # Child
362 1         19 undef $am_parent;
363 1         31 undef $child_data_fh;
364 1         170 undef $child_stat_fh;
365              
366 1 50       26 @SIG{keys %$child_sigh} = values %$child_sigh if $child_sigh;
367              
368 1 50       32 if ($code) {
369 0         0 sigprocmask(SIG_SETMASK, $sigset_bak);
370 0   0     0 $code->(@{$args // []});
  0         0  
371 0         0 exit;
372             }
373             }
374              
375 2         83 sigprocmask(SIG_SETMASK, $sigset_bak);
376              
377 2         131 $cpid;
378             }
379              
380             sub _sig_chld {
381             # force select() to return immediately if child exited shortly before
382 1     1   572529 $timeout = 0;
383             }
384              
385             sub _wait_for_children {
386 3     3   7 my $ct;
387 3         44 while ((my $pid = waitpid -$pgid, WNOHANG) > 0) {
388 0 0       0 if ($pid == $dhook_pid) {
389 0         0 warn "ERROR: data_hook_helper exited, forking new one.\n";
390 0         0 $dhook_pid = _fork_data_hook_helper($parent_data_fh, $child_data_fh);
391             }
392             else {
393 0 0       0 delete $busy{$pid} and $num_busy--;
394 0 0       0 delete $idle{$pid} and $num_idle--;
395 0 0       0 warn "PID $pid exited.\n" if $debug;
396             }
397 0         0 $ct++;
398             }
399 3         9 $ct;
400             }
401              
402             sub _read_child_drool {
403 0     0   0 my $status_changed;
404 0   0     0 do {
405 0 0       0 if (select my $rfds = $child_fds, undef, undef, $timeout) {
406 0         0 $status_changed = unpack '%32b*', $rfds & $child_stat_fd;
407 0 0 0     0 if ($dhook_in_main && unpack '%32b*', $rfds & $child_data_fd) {
408 0         0 _read_child_data();
409 0   0     0 $status_changed ||= select $rfds = $child_stat_fd, undef, undef, 0;
410             }
411 0 0       0 _read_child_status() if $status_changed;
412             }
413 0         0 $timeout = 1;
414             } until _wait_for_children() || $status_changed;
415             }
416              
417             # An in-memory scoreboard would surely be nicer ...
418             sub _read_child_status {
419 0     0   0 sigprocmask(SIG_BLOCK, $sigset_all, $sigset_bak);
420 0         0 while (<$child_stat_fh>) {
421 0         0 my ($status, $pid) = unpack 'aA*';
422             # Ignore delayed status messages from no longer existing children
423 0 0 0     0 next unless $busy{$pid} or $idle{$pid};
424 0 0       0 if ($status eq 'R') {
    0          
    0          
425 0 0       0 delete $idle{$pid} and $num_idle--;
426 0 0       0 $busy{$pid}++ or $num_busy++;
427             }
428             elsif ($status eq 'S') {
429 0 0       0 delete $busy{$pid} and $num_busy--;
430 0 0       0 $idle{$pid}++ or $num_idle++;
431             }
432             elsif ($status ne '0') { # 0 = Jeffries tube. cg use only!
433 0         0 warn "ERROR: Dubious status: $_";
434             }
435             }
436 0         0 sigprocmask(SIG_SETMASK, $sigset_bak);
437             }
438              
439             sub _read_child_data {
440 3 100 66 3   30 return undef unless $child_data_fh && fileno $child_data_fh;
441              
442 2         4 my $nbytes = 0;
443 2 50       7 my $chunks = $dhook_in_main ? 3 : ~0; # read at most that many chunks per
444             # call
445              
446 2         23 sigprocmask(SIG_BLOCK, $sigset_all, $sigset_bak);
447              
448             HDR:
449 2   66     46 while ($chunks-- && sysread $child_data_fh, my $header, CLD_DATA_HDR_LEN) {
450 1         13 my ($pid, $exitcode, $thaw, $data_len) = unpack CLD_DATA_HDR_FMT, $header;
451              
452             # Exit code 256 means "undef", minimum nfreeze() data length is 3.
453 1 50 33     30 if ($pid <= 1 || $exitcode > 256 || $thaw > 1 || $data_len < 3) {
      33        
      33        
454 0         0 warn(
455             'ERROR: read corrupted child data: ',
456             "pid:$pid exitcode:$exitcode thaw:$thaw data_len:$data_len",
457             ', skipping all pending data'
458             );
459 0   0     0 $nbytes += sysread($child_data_fh, $header, 16384) || last HDR while 1;
460             }
461              
462 1   33     15 my $cbytes = sysread($child_data_fh, (my $data), $data_len) // do {
463 0         0 warn "ERROR: sysread(): $!";
464 0         0 next HDR;
465             };
466              
467 1         4 $nbytes += $cbytes;
468              
469 1 50       11 if ($cbytes != $data_len) {
470 0         0 warn "ERROR: sysread(): read $cbytes bytes but expected $data_len";
471 0         0 next HDR;
472             }
473              
474             # Don't block signals in the data hook.
475 1         14 sigprocmask(SIG_SETMASK, $sigset_bak);
476             $child_data_hook->(
477             $pid,
478 1         16 ($thaw ? eval { thaw $data } : $data) //
479 1 50 33     18 do {
    50          
480 0         0 warn "ERROR: Could not thaw() data from pid $pid: ", $@;
481 0         0 $error = $data;
482 0         0 undef;
483             },
484             $exitcode <= 255 ? $exitcode : undef,
485             );
486 1 50       86 sigprocmask(SIG_BLOCK, $sigset_all, $sigset_bak) if $chunks;
487             }
488              
489 2         11 sigprocmask(SIG_SETMASK, $sigset_bak);
490              
491 2         27 $nbytes;
492             }
493              
494             sub _kill_idlers {
495 0     0     my $plus = shift;
496              
497 0           $num_idle -= $plus;
498 0           kill 'TERM', my @idlers = (keys %idle)[0 .. --$plus];
499 0           delete @idle{@idlers};
500             }
501              
502             sub _log_child_action {
503 0     0     my ($what, $count, @more) = @_;
504 0 0         warn "$what $count child", $count == 1 ? ".\n" : "ren.\n", @more;
505             }
506              
507             sub _log_child_status {
508 0     0     warn "busy:$num_busy idle:$num_idle\n";
509             }
510              
511             1;
512              
513             __END__