File Coverage

blib/lib/Gearman/Spawner/Supervisor.pm
Criterion Covered Total %
statement 75 76 98.6
branch 19 24 79.1
condition 4 7 57.1
subroutine 15 15 100.0
pod 0 7 0.0
total 113 129 87.6


line stmt bran cond sub pod time code
1             package Gearman::Spawner::Supervisor;
2              
3 26     26   2395 use strict;
  24         44  
  24         787  
4 24     24   113 use warnings;
  24         43  
  24         553  
5              
6 24     24   14708 use Gearman::Spawner::Process;
  24         80  
  24         21630  
7              
8 69     69 0 1912 sub process { Gearman::Spawner::Process->instance }
9              
10             # forks a child process to run the manager in, then brings up requested workers
11             # returns the pid of the forked supervisor the "workers" parameter is a hash of
12             # worker class names to their respective startup arguments
13             sub start {
14 22     22 0 190 my $class = shift;
15              
16 22         259 my $supervisor = $class->new(@_); # new takes the same parameters as start
17              
18             # try loading modules before fork so obvious compile errors get reported to
19             # caller
20 22         160 $supervisor->try_load_modules;
21              
22 13         1976 my $pid = process->fork("[Gearman::Spawner] $0", 1);
23 13 100       655 return $pid if $pid;
24              
25 9         182 $supervisor->spawn;
26              
27 6         96 process->loop;
28              
29 0         0 die "manager exited unexpectedly";
30             }
31              
32             sub new {
33 22     22 0 49 my $class = shift;
34 22         386 return bless {
35             # allowed parameters: servers, workers, preload
36             @_,
37             _pid => $$,
38             }, $class;
39             }
40              
41             # launch workers in subprocesses
42             sub spawn {
43 9     9 0 70 my $self = shift;
44              
45 9         149 $self->load_modules;
46              
47 9         81 my $workers = $self->{workers};
48 9         84 for my $class (keys %$workers) {
49 8         22 my $config = $workers->{$class};
50              
51 8   100     88 my $count = delete $config->{count} || 1;
52              
53 8         32 for my $n (1 .. $count) {
54 8         24 my $slot = $n;
55              
56             my $handle = process->maintain_subprocess(sub {
57 13     13   160 $self->start_worker(
58             servers => $self->{servers},
59             class => $class,
60             slot => $slot,
61             %$config,
62             );
63 8         42 });
64 5         1280 $self->{_handles}{"$class #$n"} = $handle;
65             }
66             }
67              
68 6         59 return $self;
69             }
70              
71             sub load_modules {
72 18     18 0 118 my $self = shift;
73              
74 18 50       3810 return if $self->{_loaded}++;
75              
76 18   50     580 my $preload = $self->{preload} || [];
77 18         100 my $workers = $self->{workers};
78              
79 18         574 for my $module (@$preload, keys %$workers) {
80 16     14   10853 my $ok = eval qq{use $module; 1;};
  14         18995  
  11         2695  
  11         241  
81 16 100       1128 $@ && die $@;
82 11 50       121 $ok || die "$module didn't return a true value";
83             }
84             }
85              
86             sub try_load_modules {
87 22     22 0 69 my $self = shift;
88              
89             # run test in a subprocess so parent doesn't incur memory overhead of
90             # modules it may not use
91 22         128 my $pid = process->fork("module test");
92              
93 22 100       1480 if (!$pid) {
94 6         180 eval { $self->load_modules };
  6         462  
95 6 100       96 exit 1 if $@;
96 4         155 exit 0;
97             }
98              
99 16         9405804 waitpid $pid, 0;
100 16 100       981 if ($?) {
101             # something unknown failed in child loader, repeat loading in this
102             # process to expose error to caller
103 3         313 $self->load_modules;
104             }
105             }
106              
107             # fork a worker process and start grabbing jobs
108             sub start_worker {
109 13     13 0 25 my $self = shift;
110 13         141 my %params = @_;
111              
112 13         67 my $supervisor_pid = $$;
113              
114 13         47 my $pid = process->fork("$params{class}-worker #$params{slot}");
115 13 100       2713 return $pid if $pid;
116              
117 5         746 my $worker = $params{class}->new(
118             $params{servers}, $params{slot}, $params{data}
119             );
120              
121 5         79 my $quitting = 0;
122 5         17 my $jobs_done = 0;
123 5     3   280 $SIG{INT} = $SIG{TERM} = sub { $quitting = 1 };
  3         959678  
124 5         161 while (!$quitting) {
125 45         116 eval {
126 45     43   428 $worker->work(stop_if => sub {1});
  43         95031  
127             };
128 43 50       650 $@ && warn "$params{class} [$$] failed: $@";
129 43         212 $jobs_done++;
130              
131             # bail if supervisor went away
132 43 50       358 $quitting++ if getppid != $supervisor_pid;
133 43 50 33     273 $quitting++ if $params{max_jobs} && $jobs_done > $params{max_jobs};
134             }
135 3         934 exit 0;
136             }
137              
138             sub DESTROY {
139 14     14   356 my $self = shift;
140 14 100       2810 return unless $self->{_pid} == $$;
141 7         139 process->kill_maintained(values %{ $self->{_handles} });
  7         278  
142             }
143              
144             1;