File Coverage

blib/lib/Mojo/Server/Prefork.pm
Criterion Covered Total %
statement 22 128 17.1
branch 1 62 1.6
condition 0 47 0.0
subroutine 8 27 29.6
pod 4 4 100.0
total 35 268 13.0


line stmt bran cond sub pod time code
1             package Mojo::Server::Prefork;
2 3     3   505 use Mojo::Base 'Mojo::Server::Daemon';
  3         13  
  3         23  
3              
4 3     3   21 use Config;
  3         7  
  3         135  
5 3     3   18 use File::Spec::Functions qw(tmpdir);
  3         8  
  3         172  
6 3     3   20 use Mojo::File qw(path);
  3         11  
  3         148  
7 3     3   17 use Mojo::Util qw(steady_time);
  3         7  
  3         212  
8 3     3   22 use POSIX qw(WNOHANG);
  3         5  
  3         60  
9 3     3   281 use Scalar::Util qw(weaken);
  3         16  
  3         7813  
10              
11             has accepts => 10000;
12             has cleanup => 1;
13             has graceful_timeout => 120;
14             has heartbeat_timeout => 50;
15             has heartbeat_interval => 5;
16             has pid_file => sub { path(tmpdir, 'prefork.pid')->to_string };
17             has spare => 2;
18             has workers => 4;
19              
20 5 50   5   2982 sub DESTROY { path($_[0]->pid_file)->remove if $_[0]->cleanup }
21              
22             sub check_pid {
23 0 0   0 1   return undef unless -r (my $file = path(shift->pid_file));
24 0           my $pid = $file->slurp;
25 0           chomp $pid;
26              
27             # Running
28 0 0 0       return $pid if $pid && kill 0, $pid;
29              
30             # Not running
31 0           $file->remove;
32 0           return undef;
33             }
34              
35             sub ensure_pid_file {
36 0     0 1   my ($self, $pid) = @_;
37              
38             # Check if PID file already exists
39 0 0         return if -e (my $file = path($self->pid_file));
40              
41             # Create PID file
42 0 0         if (my $err = eval { $file->spew("$pid\n")->chmod(0644) } ? undef : $@) {
  0 0          
43 0 0         $self->app->log->error(qq{Can't create process id file "$file": $err})
44             and die qq{Can't create process id file "$file": $err};
45             }
46 0           $self->app->log->info(qq{Creating process id file "$file"});
47             }
48              
49             sub healthy {
50 0     0 1   scalar grep { $_->{healthy} } values %{shift->{pool}};
  0            
  0            
51             }
52              
53             sub run {
54 0     0 1   my $self = shift;
55              
56             # No fork emulation support
57 0 0 0       say 'Pre-forking does not support fork emulation.' and exit 0 if $Config{d_pseudofork};
58              
59             # Pipe for worker communication
60 0 0         pipe($self->{reader}, $self->{writer}) or die "Can't create pipe: $!";
61              
62             # Clean manager environment
63             local $SIG{CHLD} = sub {
64 0     0     while ((my $pid = waitpid -1, WNOHANG) > 0) { $self->emit(reap => $pid)->_stopped($pid) }
  0            
65 0           };
66 0     0     local $SIG{INT} = local $SIG{TERM} = sub { $self->_term };
  0            
67 0     0     local $SIG{QUIT} = sub { $self->_term(1) };
  0            
68 0     0     local $SIG{TTIN} = sub { $self->workers($self->workers + 1) };
  0            
69             local $SIG{TTOU} = sub {
70 0 0   0     $self->workers > 0 ? $self->workers($self->workers - 1) : return;
71 0 0 0       for my $w (values %{$self->{pool}}) { ($w->{graceful} = steady_time) and last unless $w->{graceful} }
  0            
  0            
72 0           };
73              
74             # Preload application before starting workers
75 0           $self->start->app->log->info("Manager $$ started");
76 0           $self->ioloop->max_accepts($self->accepts);
77 0           $self->{running} = 1;
78 0           $self->_manage while $self->{running};
79 0           $self->app->log->info("Manager $$ stopped");
80             }
81              
82 0 0   0     sub _heartbeat { shift->{writer}->syswrite("$$:$_[0]\n") or exit 0 }
83              
84             sub _manage {
85 0     0     my $self = shift;
86              
87             # Spawn more workers if necessary and check PID file
88 0 0         if (!$self->{finished}) {
    0          
89 0           my $graceful = grep { $_->{graceful} } values %{$self->{pool}};
  0            
  0            
90 0           my $spare = $self->spare;
91 0 0         $spare = $graceful ? $graceful > $spare ? $spare : $graceful : 0;
    0          
92 0           my $need = ($self->workers - keys %{$self->{pool}}) + $spare;
  0            
93 0           $self->_spawn while $need-- > 0;
94 0           $self->ensure_pid_file($$);
95             }
96              
97             # Shutdown
98 0           elsif (!keys %{$self->{pool}}) { return delete $self->{running} }
  0            
99              
100             # Wait for heartbeats
101 0           $self->_wait;
102              
103 0           my $interval = $self->heartbeat_interval;
104 0           my $ht = $self->heartbeat_timeout;
105 0           my $gt = $self->graceful_timeout;
106 0           my $log = $self->app->log;
107 0           my $time = steady_time;
108              
109 0           for my $pid (keys %{$self->{pool}}) {
  0            
110 0 0         next unless my $w = $self->{pool}{$pid};
111              
112             # No heartbeat (graceful stop)
113             $log->error("Worker $pid has no heartbeat ($ht seconds), restarting (see FAQ for more)") and $w->{graceful} = $time
114 0 0 0       if !$w->{graceful} && ($w->{time} + $interval + $ht <= $time);
      0        
115              
116             # Graceful stop with timeout
117 0 0 0       my $graceful = $w->{graceful} ||= $self->{graceful} ? $time : undef;
118             $log->info("Stopping worker $pid gracefully ($gt seconds)") and (kill 'QUIT', $pid or $self->_stopped($pid))
119 0 0 0       if $graceful && !$w->{quit}++;
      0        
      0        
120 0 0 0       $w->{force} = 1 if $graceful && $graceful + $gt <= $time;
121              
122             # Normal stop
123             $log->warn("Stopping worker $pid immediately") and (kill 'KILL', $pid or $self->_stopped($pid))
124 0 0 0       if $w->{force} || ($self->{finished} && !$graceful);
      0        
      0        
      0        
125             }
126             }
127              
128             sub _spawn {
129 0     0     my $self = shift;
130              
131             # Manager
132 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
133 0 0         return $self->emit(spawn => $pid)->{pool}{$pid} = {time => steady_time} if $pid;
134              
135             # Heartbeat messages
136 0           my $loop = $self->cleanup(0)->ioloop;
137 0           my $finished = 0;
138 0     0     $loop->on(finish => sub { $finished = 1 });
  0            
139 0           weaken $self;
140 0     0     my $cb = sub { $self->_heartbeat($finished) };
  0            
141 0           $loop->next_tick($cb);
142 0           $loop->recurring($self->heartbeat_interval => $cb);
143              
144             # Clean worker environment
145 0           $SIG{$_} = 'DEFAULT' for qw(CHLD INT TERM TTIN TTOU);
146 0     0     $SIG{QUIT} = sub { $loop->stop_gracefully };
  0            
147 0     0     $loop->on(finish => sub { $self->max_requests(1) });
  0            
148 0           delete $self->{reader};
149 0           srand;
150              
151 0           $self->app->log->info("Worker $$ started");
152 0           $loop->start;
153 0           exit 0;
154             }
155              
156             sub _stopped {
157 0     0     my ($self, $pid) = @_;
158              
159 0 0         return unless my $w = delete $self->{pool}{$pid};
160              
161 0           my $log = $self->app->log;
162 0           $log->info("Worker $pid stopped");
163 0 0 0       $log->error("Worker $pid stopped too early, shutting down") and $self->_term unless $w->{healthy};
164             }
165              
166             sub _term {
167 0     0     my ($self, $graceful) = @_;
168 0           @{$self->emit(finish => $graceful)}{qw(finished graceful)} = (1, $graceful);
  0            
169             }
170              
171             sub _wait {
172 0     0     my $self = shift;
173              
174             # Poll for heartbeats
175 0           my $reader = $self->emit('wait')->{reader};
176 0 0         return unless Mojo::Util::_readable(1000, fileno($reader));
177 0 0         return unless $reader->sysread(my $chunk, 4194304);
178              
179             # Update heartbeats (and stop gracefully if necessary)
180 0           my $time = steady_time;
181 0           while ($chunk =~ /(\d+):(\d)\n/g) {
182 0 0         next unless my $w = $self->{pool}{$1};
183 0 0         @$w{qw(healthy time)} = (1, $time) and $self->emit(heartbeat => $1);
184 0 0         if ($2) {
185 0   0       $w->{graceful} ||= $time;
186 0           $w->{quit}++;
187             }
188             }
189             }
190              
191             1;
192              
193             =encoding utf8
194              
195             =head1 NAME
196              
197             Mojo::Server::Prefork - Pre-forking non-blocking I/O HTTP and WebSocket server
198              
199             =head1 SYNOPSIS
200              
201             use Mojo::Server::Prefork;
202              
203             my $prefork = Mojo::Server::Prefork->new(listen => ['http://*:8080']);
204             $prefork->unsubscribe('request')->on(request => sub ($prefork, $tx) {
205              
206             # Request
207             my $method = $tx->req->method;
208             my $path = $tx->req->url->path;
209              
210             # Response
211             $tx->res->code(200);
212             $tx->res->headers->content_type('text/plain');
213             $tx->res->body("$method request for $path!");
214              
215             # Resume transaction
216             $tx->resume;
217             });
218             $prefork->run;
219              
220             =head1 DESCRIPTION
221              
222             L is a full featured, UNIX optimized, pre-forking non-blocking I/O HTTP and WebSocket server,
223             built around the very well tested and reliable L, with IPv6, TLS, SNI, UNIX domain socket, Comet
224             (long polling), keep-alive and multiple event loop support. Note that the server uses signals for process management,
225             so you should avoid modifying signal handlers in your applications.
226              
227             For better scalability (epoll, kqueue) and to provide non-blocking name resolution, SOCKS5 as well as TLS support, the
228             optional modules L (4.32+), L (0.15+), L (0.64+) and L
229             (1.84+) will be used automatically if possible. Individual features can also be disabled with the C,
230             C and C environment variables.
231              
232             See L for more.
233              
234             =head1 MANAGER SIGNALS
235              
236             The L manager process can be controlled at runtime with the following signals.
237              
238             =head2 INT, TERM
239              
240             Shut down server immediately.
241              
242             =head2 QUIT
243              
244             Shut down server gracefully.
245              
246             =head2 TTIN
247              
248             Increase worker pool by one.
249              
250             =head2 TTOU
251              
252             Decrease worker pool by one.
253              
254             =head1 WORKER SIGNALS
255              
256             L worker processes can be controlled at runtime with the following signals.
257              
258             =head2 QUIT
259              
260             Stop worker gracefully.
261              
262             =head1 EVENTS
263              
264             L inherits all events from L and can emit the following new ones.
265              
266             =head2 finish
267              
268             $prefork->on(finish => sub ($prefork, $graceful) {...});
269              
270             Emitted when the server shuts down.
271              
272             $prefork->on(finish => sub ($prefork, $graceful) {
273             say $graceful ? 'Graceful server shutdown' : 'Server shutdown';
274             });
275              
276             =head2 heartbeat
277              
278             $prefork->on(heartbeat => sub ($prefork, $pid) {...});
279              
280             Emitted when a heartbeat message has been received from a worker.
281              
282             $prefork->on(heartbeat => sub ($prefork, $pid) { say "Worker $pid has a heartbeat" });
283              
284             =head2 reap
285              
286             $prefork->on(reap => sub ($prefork, $pid) {...});
287              
288             Emitted when a child process exited.
289              
290             $prefork->on(reap => sub ($prefork, $pid) { say "Worker $pid stopped" });
291              
292             =head2 spawn
293              
294             $prefork->on(spawn => sub ($prefork, $pid) {...});
295              
296             Emitted when a worker process is spawned.
297              
298             $prefork->on(spawn => sub ($prefork, $pid) { say "Worker $pid started" });
299              
300             =head2 wait
301              
302             $prefork->on(wait => sub ($prefork) {...});
303              
304             Emitted when the manager starts waiting for new heartbeat messages.
305              
306             $prefork->on(wait => sub ($prefork) {
307             my $workers = $prefork->workers;
308             say "Waiting for heartbeat messages from $workers workers";
309             });
310              
311             =head1 ATTRIBUTES
312              
313             L inherits all attributes from L and implements the following new ones.
314              
315             =head2 accepts
316              
317             my $accepts = $prefork->accepts;
318             $prefork = $prefork->accepts(100);
319              
320             Maximum number of connections a worker is allowed to accept, before stopping gracefully and then getting replaced with
321             a newly started worker, passed along to L, defaults to C<10000>. Setting the value to C<0>
322             will allow workers to accept new connections indefinitely. Note that up to half of this value can be subtracted
323             randomly to improve load balancing, and to make sure that not all workers restart at the same time.
324              
325             =head2 cleanup
326              
327             my $bool = $prefork->cleanup;
328             $prefork = $prefork->cleanup($bool);
329              
330             Delete L automatically once it is not needed anymore, defaults to a true value.
331              
332             =head2 graceful_timeout
333              
334             my $timeout = $prefork->graceful_timeout;
335             $prefork = $prefork->graceful_timeout(15);
336              
337             Maximum amount of time in seconds stopping a worker gracefully may take before being forced, defaults to C<120>. Note
338             that this value should usually be a little larger than the maximum amount of time you expect any one request to take.
339              
340             =head2 heartbeat_interval
341              
342             my $interval = $prefork->heartbeat_interval;
343             $prefork = $prefork->heartbeat_interval(3);
344              
345             Heartbeat interval in seconds, defaults to C<5>.
346              
347             =head2 heartbeat_timeout
348              
349             my $timeout = $prefork->heartbeat_timeout;
350             $prefork = $prefork->heartbeat_timeout(2);
351              
352             Maximum amount of time in seconds before a worker without a heartbeat will be stopped gracefully, defaults to C<50>.
353             Note that this value should usually be a little larger than the maximum amount of time you expect any one operation to
354             block the event loop.
355              
356             =head2 pid_file
357              
358             my $file = $prefork->pid_file;
359             $prefork = $prefork->pid_file('/tmp/prefork.pid');
360              
361             Full path of process id file, defaults to C in a temporary directory.
362              
363             =head2 spare
364              
365             my $spare = $prefork->spare;
366             $prefork = $prefork->spare(4);
367              
368             Temporarily spawn up to this number of additional workers if there is a need, defaults to C<2>. This allows for new
369             workers to be started while old ones are still shutting down gracefully, drastically reducing the performance cost of
370             worker restarts.
371              
372             =head2 workers
373              
374             my $workers = $prefork->workers;
375             $prefork = $prefork->workers(10);
376              
377             Number of worker processes, defaults to C<4>. A good rule of thumb is two worker processes per CPU core for
378             applications that perform mostly non-blocking operations, blocking operations often require more and benefit from
379             decreasing concurrency with L (often as low as C<1>).
380              
381             =head1 METHODS
382              
383             L inherits all methods from L and implements the following new ones.
384              
385             =head2 check_pid
386              
387             my $pid = $prefork->check_pid;
388              
389             Get process id for running server from L or delete it if server is not running.
390              
391             say 'Server is not running' unless $prefork->check_pid;
392              
393             =head2 ensure_pid_file
394              
395             $prefork->ensure_pid_file($pid);
396              
397             Ensure L exists.
398              
399             =head2 healthy
400              
401             my $healthy = $prefork->healthy;
402              
403             Number of currently active worker processes with a heartbeat.
404              
405             =head2 run
406              
407             $prefork->run;
408              
409             Run server and wait for L.
410              
411             =head1 SEE ALSO
412              
413             L, L, L.
414              
415             =cut