File Coverage

blib/lib/Gearman/Spawner/Process.pm
Criterion Covered Total %
statement 79 85 92.9
branch 13 22 59.0
condition 3 6 50.0
subroutine 21 22 95.4
pod 0 10 0.0
total 116 145 80.0


line stmt bran cond sub pod time code
1             package Gearman::Spawner::Process;
2              
3             # class that encapsulates things of which there should only be one per process,
4             # like signal handlers
5              
6 25     25   124 use strict;
  25         50  
  25         1987  
7 25     25   128 use warnings;
  25         70  
  25         640  
8              
9 25     25   145 use Carp qw( croak );
  25         290  
  25         1230  
10 25     25   26854 use Danga::Socket ();
  25         1025313  
  25         1045  
11 25     25   242 use POSIX qw( WNOHANG );
  25         51  
  25         214  
12              
13             our $CHECK_PERIOD = 5;
14              
15             my $INSTANCE;
16             sub instance {
17 69     69 0 424 my $class = shift;
18 69   66     1725 return $INSTANCE ||= $class->new();
19             }
20              
21             sub new {
22 20     20 0 126 my $class = shift;
23              
24 20 50       115 die "new can only be called once" if $INSTANCE;
25              
26 20         250 my $self = bless {
27             next_handle => 1,
28             }, $class;
29              
30 20     23   461 my $reaper = sub { $self->reap; 1 };
  23         1345  
  21         157  
31              
32 20     4   2876 $SIG{INT} = $SIG{TERM} = sub { $self->quit };
  4         580601  
33              
34             # after a child dies, collect children on next pass through event loop
35 20     23   549 $SIG{CHLD} = sub { Danga::Socket->AddTimer(0, $reaper); };
  23         1505891  
36              
37             # ... also check for dead children periodically on a timer to work around signal races
38 20         775 $self->run_periodically($reaper);
39              
40 20         827 return $self;
41             }
42              
43             sub reap {
44 23     23 0 50 my $self = shift;
45 23         449 while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
46 5         51 my $cb = delete $self->{_chld_actions}{$pid};
47 5 50       49 $cb->() if $cb;
48             }
49             }
50              
51             sub fork {
52 69     69 0 216 my $self = shift;
53 69         304 my $child_name = shift;
54 69         178 my $clingy = shift;
55              
56 69         303 my $parent_pid = $$;
57              
58 69   33     137604 my $pid = CORE::fork() // croak "failed to fork: $!";
59 69 100       8559 return $pid if $pid; # parent
60              
61             # child
62 20 100       1838 $self->exit_with_parent($parent_pid) if $clingy;
63              
64 20         3415 $0 = $child_name;
65              
66 20         1252 return $pid;
67             }
68              
69             sub exit_with_parent {
70 9     9 0 224 my $self = shift;
71 9         149 my $orig_ppid = shift;
72 9         540 my $orig_pid = $$;
73              
74 9 50       1326 if (getppid != $orig_ppid) {
75 0         0 warn "Exiting immediately because parent changed\n";
76 0         0 exit;
77             }
78             $self->run_periodically(sub {
79 13 50   13   208 return 0 if $$ != $orig_pid; # stop checking if we're a child process of whichever set this
80 13 50       96 exit if getppid != $orig_ppid;
81 13         39 1;
82 9         1921 });
83             }
84              
85             sub maintain_subprocess {
86 8     8 0 26 my $self = shift;
87 8         16 my $process_creator = shift; # subref that returns a pid
88              
89 8         30 my $handle = $self->{next_handle}++;
90              
91 8         22 my $recreator;
92             $recreator = sub {
93 13     13   64 my $pid = $process_creator->();
94 8         1136 $self->{_pid_for_handle}{$handle} = $pid;
95 8         207 $self->{_chld_actions}{$pid} = $recreator;
96 8         104 };
97 8         30 $recreator->();
98              
99 5         96 return $handle;
100             }
101              
102             # takes one or more handles returned from maintain_process and kills the associated process
103             sub kill_maintained {
104 7     7 0 232 my $self = shift;
105 7         38 my @handles = @_;
106              
107 7         478 kill 'INT', grep { defined } map { $self->{_pid_for_handle}{$_} } @handles;
  0         0  
  0         0  
108             }
109              
110             # takes a subref and a number of seconds, and runs the sub that often. if the
111             # sub returns a false value, it will not be run again.
112             sub run_periodically {
113 29     29 0 372 my $self = shift;
114 29         295 my $sub = shift;
115 29         57 my $recycler;
116             $recycler = sub {
117 26     26   3138977 my $again = $sub->();
118 26 50       101 if ($again) {
119 26         298 Danga::Socket->AddTimer($CHECK_PERIOD, $recycler);
120             }
121 29         475 };
122 29         913 Danga::Socket->AddTimer(0, $recycler);
123             }
124              
125             sub loop {
126 6     6 0 24 my $self = shift;
127 6         834 Danga::Socket->EventLoop;
128             }
129              
130             sub quit {
131 4     4 0 23 my $self = shift;
132 4 50       103 return if $self->{_quitting}++;
133              
134 4         18 my @children = keys %{ $self->{_chld_actions} };
  4         89  
135 4 100       259 kill 'INT', @children if @children;
136 4         1062 exit;
137             }
138              
139             sub DESTROY {
140 0 0   0     return unless $INSTANCE;
141 0           $INSTANCE->quit;
142             }
143              
144             1;