File Coverage

blib/lib/Mojo/Server/Prefork.pm
Criterion Covered Total %
statement 22 126 17.4
branch 1 62 1.6
condition 0 47 0.0
subroutine 8 27 29.6
pod 4 4 100.0
total 35 266 13.1


line stmt bran cond sub pod time code
1             package Mojo::Server::Prefork;
2 3     3   478 use Mojo::Base 'Mojo::Server::Daemon';
  3         8  
  3         18  
3              
4 3     3   24 use Config;
  3         6  
  3         142  
5 3     3   22 use File::Spec::Functions qw(tmpdir);
  3         7  
  3         176  
6 3     3   21 use Mojo::File qw(path);
  3         7  
  3         132  
7 3     3   19 use Mojo::Util qw(steady_time);
  3         6  
  3         144  
8 3     3   22 use POSIX qw(WNOHANG);
  3         7  
  3         55  
9 3     3   265 use Scalar::Util qw(weaken);
  3         8  
  3         7276  
10              
11             has accepts => 10000;
12             has cleanup => 1;
13             has graceful_timeout => 120;
14             has heartbeat_timeout => 50;
15             has heartbeat_interval => 5;
16             has pid_file => sub { path(tmpdir, 'prefork.pid')->to_string };
17             has spare => 2;
18             has workers => 4;
19              
20 5 50   5   2587 sub DESTROY { path($_[0]->pid_file)->remove if $_[0]->cleanup }
21              
22             sub check_pid {
23 0 0   0 1   return undef unless -r (my $file = path(shift->pid_file));
24 0           my $pid = $file->slurp;
25 0           chomp $pid;
26              
27             # Running
28 0 0 0       return $pid if $pid && kill 0, $pid;
29              
30             # Not running
31 0           $file->remove;
32 0           return undef;
33             }
34              
35             sub ensure_pid_file {
36 0     0 1   my ($self, $pid) = @_;
37              
38             # Check if PID file already exists
39 0 0         return if -e (my $file = path($self->pid_file));
40              
41             # Create PID file
42 0 0         if (my $err = eval { $file->spurt("$pid\n")->chmod(0644) } ? undef : $@) {
  0 0          
43 0 0         $self->app->log->error(qq{Can't create process id file "$file": $err})
44             and die qq{Can't create process id file "$file": $err};
45             }
46 0           $self->app->log->info(qq{Creating process id file "$file"});
47             }
48              
49             sub healthy {
50 0     0 1   scalar grep { $_->{healthy} } values %{shift->{pool}};
  0            
  0            
51             }
52              
53             sub run {
54 0     0 1   my $self = shift;
55              
56             # No fork emulation support
57 0 0 0       say 'Pre-forking does not support fork emulation.' and exit 0 if $Config{d_pseudofork};
58              
59             # Pipe for worker communication
60 0 0         pipe($self->{reader}, $self->{writer}) or die "Can't create pipe: $!";
61              
62             # Clean manager environment
63             local $SIG{CHLD} = sub {
64 0     0     while ((my $pid = waitpid -1, WNOHANG) > 0) { $self->emit(reap => $pid)->_stopped($pid) }
  0            
65 0           };
66 0     0     local $SIG{INT} = local $SIG{TERM} = sub { $self->_term };
  0            
67 0     0     local $SIG{QUIT} = sub { $self->_term(1) };
  0            
68 0     0     local $SIG{TTIN} = sub { $self->workers($self->workers + 1) };
  0            
69             local $SIG{TTOU} = sub {
70 0 0   0     $self->workers > 0 ? $self->workers($self->workers - 1) : return;
71 0 0 0       for my $w (values %{$self->{pool}}) { ($w->{graceful} = steady_time) and last unless $w->{graceful} }
  0            
  0            
72 0           };
73              
74             # Preload application before starting workers
75 0           $self->start->app->log->info("Manager $$ started");
76 0           $self->ioloop->max_accepts($self->accepts);
77 0           $self->{running} = 1;
78 0           $self->_manage while $self->{running};
79 0           $self->app->log->info("Manager $$ stopped");
80             }
81              
82 0 0   0     sub _heartbeat { shift->{writer}->syswrite("$$:$_[0]\n") or exit 0 }
83              
84             sub _manage {
85 0     0     my $self = shift;
86              
87             # Spawn more workers if necessary and check PID file
88 0 0         if (!$self->{finished}) {
    0          
89 0           my $graceful = grep { $_->{graceful} } values %{$self->{pool}};
  0            
  0            
90 0           my $spare = $self->spare;
91 0 0         $spare = $graceful ? $graceful > $spare ? $spare : $graceful : 0;
    0          
92 0           my $need = ($self->workers - keys %{$self->{pool}}) + $spare;
  0            
93 0           $self->_spawn while $need-- > 0;
94 0           $self->ensure_pid_file($$);
95             }
96              
97             # Shutdown
98 0           elsif (!keys %{$self->{pool}}) { return delete $self->{running} }
  0            
99              
100             # Wait for heartbeats
101 0           $self->_wait;
102              
103 0           my $interval = $self->heartbeat_interval;
104 0           my $ht = $self->heartbeat_timeout;
105 0           my $gt = $self->graceful_timeout;
106 0           my $log = $self->app->log;
107 0           my $time = steady_time;
108              
109 0           for my $pid (keys %{$self->{pool}}) {
  0            
110 0 0         next unless my $w = $self->{pool}{$pid};
111              
112             # No heartbeat (graceful stop)
113             $log->error("Worker $pid has no heartbeat ($ht seconds), restarting") and $w->{graceful} = $time
114 0 0 0       if !$w->{graceful} && ($w->{time} + $interval + $ht <= $time);
      0        
115              
116             # Graceful stop with timeout
117 0 0 0       my $graceful = $w->{graceful} ||= $self->{graceful} ? $time : undef;
118             $log->info("Stopping worker $pid gracefully ($gt seconds)") and (kill 'QUIT', $pid or $self->_stopped($pid))
119 0 0 0       if $graceful && !$w->{quit}++;
      0        
      0        
120 0 0 0       $w->{force} = 1 if $graceful && $graceful + $gt <= $time;
121              
122             # Normal stop
123             $log->warn("Stopping worker $pid immediately") and (kill 'KILL', $pid or $self->_stopped($pid))
124 0 0 0       if $w->{force} || ($self->{finished} && !$graceful);
      0        
      0        
      0        
125             }
126             }
127              
128             sub _spawn {
129 0     0     my $self = shift;
130              
131             # Manager
132 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
133 0 0         return $self->emit(spawn => $pid)->{pool}{$pid} = {time => steady_time} if $pid;
134              
135             # Heartbeat messages
136 0           my $loop = $self->cleanup(0)->ioloop;
137 0           my $finished = 0;
138 0     0     $loop->on(finish => sub { $finished = 1 });
  0            
139 0           weaken $self;
140 0     0     my $cb = sub { $self->_heartbeat($finished) };
  0            
141 0           $loop->next_tick($cb);
142 0           $loop->recurring($self->heartbeat_interval => $cb);
143              
144             # Clean worker environment
145 0           $SIG{$_} = 'DEFAULT' for qw(CHLD INT TERM TTIN TTOU);
146 0     0     $SIG{QUIT} = sub { $loop->stop_gracefully };
  0            
147 0     0     $loop->on(finish => sub { $self->max_requests(1) });
  0            
148 0           delete $self->{reader};
149 0           srand;
150              
151 0           $self->app->log->info("Worker $$ started");
152 0           $loop->start;
153 0           exit 0;
154             }
155              
156             sub _stopped {
157 0     0     my ($self, $pid) = @_;
158              
159 0 0         return unless my $w = delete $self->{pool}{$pid};
160              
161 0           my $log = $self->app->log;
162 0           $log->info("Worker $pid stopped");
163 0 0 0       $log->error("Worker $pid stopped too early, shutting down") and $self->_term unless $w->{healthy};
164             }
165              
166             sub _term {
167 0     0     my ($self, $graceful) = @_;
168 0           @{$self->emit(finish => $graceful)}{qw(finished graceful)} = (1, $graceful);
  0            
169             }
170              
171             sub _wait {
172 0     0     my $self = shift;
173              
174             # Poll for heartbeats
175 0           my $reader = $self->emit('wait')->{reader};
176 0 0         return unless Mojo::Util::_readable(1000, fileno($reader));
177 0 0         return unless $reader->sysread(my $chunk, 4194304);
178              
179             # Update heartbeats (and stop gracefully if necessary)
180 0           my $time = steady_time;
181 0           while ($chunk =~ /(\d+):(\d)\n/g) {
182 0 0         next unless my $w = $self->{pool}{$1};
183 0 0         @$w{qw(healthy time)} = (1, $time) and $self->emit(heartbeat => $1);
184 0 0 0       $w->{graceful} ||= $time if $2;
185             }
186             }
187              
188             1;
189              
190             =encoding utf8
191              
192             =head1 NAME
193              
194             Mojo::Server::Prefork - Pre-forking non-blocking I/O HTTP and WebSocket server
195              
196             =head1 SYNOPSIS
197              
198             use Mojo::Server::Prefork;
199              
200             my $prefork = Mojo::Server::Prefork->new(listen => ['http://*:8080']);
201             $prefork->unsubscribe('request')->on(request => sub ($prefork, $tx) {
202              
203             # Request
204             my $method = $tx->req->method;
205             my $path = $tx->req->url->path;
206              
207             # Response
208             $tx->res->code(200);
209             $tx->res->headers->content_type('text/plain');
210             $tx->res->body("$method request for $path!");
211              
212             # Resume transaction
213             $tx->resume;
214             });
215             $prefork->run;
216              
217             =head1 DESCRIPTION
218              
219             L is a full featured, UNIX optimized, pre-forking non-blocking I/O HTTP and WebSocket server,
220             built around the very well tested and reliable L, with IPv6, TLS, SNI, UNIX domain socket, Comet
221             (long polling), keep-alive and multiple event loop support. Note that the server uses signals for process management,
222             so you should avoid modifying signal handlers in your applications.
223              
224             For better scalability (epoll, kqueue) and to provide non-blocking name resolution, SOCKS5 as well as TLS support, the
225             optional modules L (4.32+), L (0.15+), L (0.64+) and L
226             (1.84+) will be used automatically if possible. Individual features can also be disabled with the C,
227             C and C environment variables.
228              
229             See L for more.
230              
231             =head1 MANAGER SIGNALS
232              
233             The L manager process can be controlled at runtime with the following signals.
234              
235             =head2 INT, TERM
236              
237             Shut down server immediately.
238              
239             =head2 QUIT
240              
241             Shut down server gracefully.
242              
243             =head2 TTIN
244              
245             Increase worker pool by one.
246              
247             =head2 TTOU
248              
249             Decrease worker pool by one.
250              
251             =head1 WORKER SIGNALS
252              
253             L worker processes can be controlled at runtime with the following signals.
254              
255             =head2 QUIT
256              
257             Stop worker gracefully.
258              
259             =head1 EVENTS
260              
261             L inherits all events from L and can emit the following new ones.
262              
263             =head2 finish
264              
265             $prefork->on(finish => sub ($prefork, $graceful) {...});
266              
267             Emitted when the server shuts down.
268              
269             $prefork->on(finish => sub ($prefork, $graceful) {
270             say $graceful ? 'Graceful server shutdown' : 'Server shutdown';
271             });
272              
273             =head2 heartbeat
274              
275             $prefork->on(heartbeat => sub ($prefork, $pid) {...});
276              
277             Emitted when a heartbeat message has been received from a worker.
278              
279             $prefork->on(heartbeat => sub ($prefork, $pid) { say "Worker $pid has a heartbeat" });
280              
281             =head2 reap
282              
283             $prefork->on(reap => sub ($prefork, $pid) {...});
284              
285             Emitted when a child process exited.
286              
287             $prefork->on(reap => sub ($prefork, $pid) { say "Worker $pid stopped" });
288              
289             =head2 spawn
290              
291             $prefork->on(spawn => sub ($prefork, $pid) {...});
292              
293             Emitted when a worker process is spawned.
294              
295             $prefork->on(spawn => sub ($prefork, $pid) { say "Worker $pid started" });
296              
297             =head2 wait
298              
299             $prefork->on(wait => sub ($prefork) {...});
300              
301             Emitted when the manager starts waiting for new heartbeat messages.
302              
303             $prefork->on(wait => sub ($prefork) {
304             my $workers = $prefork->workers;
305             say "Waiting for heartbeat messages from $workers workers";
306             });
307              
308             =head1 ATTRIBUTES
309              
310             L inherits all attributes from L and implements the following new ones.
311              
312             =head2 accepts
313              
314             my $accepts = $prefork->accepts;
315             $prefork = $prefork->accepts(100);
316              
317             Maximum number of connections a worker is allowed to accept, before stopping gracefully and then getting replaced with
318             a newly started worker, passed along to L, defaults to C<10000>. Setting the value to C<0>
319             will allow workers to accept new connections indefinitely. Note that up to half of this value can be subtracted
320             randomly to improve load balancing, and to make sure that not all workers restart at the same time.
321              
322             =head2 cleanup
323              
324             my $bool = $prefork->cleanup;
325             $prefork = $prefork->cleanup($bool);
326              
327             Delete L automatically once it is not needed anymore, defaults to a true value.
328              
329             =head2 graceful_timeout
330              
331             my $timeout = $prefork->graceful_timeout;
332             $prefork = $prefork->graceful_timeout(15);
333              
334             Maximum amount of time in seconds stopping a worker gracefully may take before being forced, defaults to C<120>. Note
335             that this value should usually be a little larger than the maximum amount of time you expect any one request to take.
336              
337             =head2 heartbeat_interval
338              
339             my $interval = $prefork->heartbeat_interval;
340             $prefork = $prefork->heartbeat_interval(3);
341              
342             Heartbeat interval in seconds, defaults to C<5>.
343              
344             =head2 heartbeat_timeout
345              
346             my $timeout = $prefork->heartbeat_timeout;
347             $prefork = $prefork->heartbeat_timeout(2);
348              
349             Maximum amount of time in seconds before a worker without a heartbeat will be stopped gracefully, defaults to C<50>.
350             Note that this value should usually be a little larger than the maximum amount of time you expect any one operation to
351             block the event loop.
352              
353             =head2 pid_file
354              
355             my $file = $prefork->pid_file;
356             $prefork = $prefork->pid_file('/tmp/prefork.pid');
357              
358             Full path of process id file, defaults to C in a temporary directory.
359              
360             =head2 spare
361              
362             my $spare = $prefork->spare;
363             $prefork = $prefork->spare(4);
364              
365             Temporarily spawn up to this number of additional workers if there is a need, defaults to C<2>. This allows for new
366             workers to be started while old ones are still shutting down gracefully, drastically reducing the performance cost of
367             worker restarts.
368              
369             =head2 workers
370              
371             my $workers = $prefork->workers;
372             $prefork = $prefork->workers(10);
373              
374             Number of worker processes, defaults to C<4>. A good rule of thumb is two worker processes per CPU core for
375             applications that perform mostly non-blocking operations, blocking operations often require more and benefit from
376             decreasing concurrency with L (often as low as C<1>).
377              
378             =head1 METHODS
379              
380             L inherits all methods from L and implements the following new ones.
381              
382             =head2 check_pid
383              
384             my $pid = $prefork->check_pid;
385              
386             Get process id for running server from L or delete it if server is not running.
387              
388             say 'Server is not running' unless $prefork->check_pid;
389              
390             =head2 ensure_pid_file
391              
392             $prefork->ensure_pid_file($pid);
393              
394             Ensure L exists.
395              
396             =head2 healthy
397              
398             my $healthy = $prefork->healthy;
399              
400             Number of currently active worker processes with a heartbeat.
401              
402             =head2 run
403              
404             $prefork->run;
405              
406             Run server and wait for L.
407              
408             =head1 SEE ALSO
409              
410             L, L, L.
411              
412             =cut