File Coverage

blib/lib/Mojo/Server/Daemon.pm
Criterion Covered Total %
statement 158 177 89.2
branch 55 76 72.3
condition 22 36 61.1
subroutine 25 30 83.3
pod 4 4 100.0
total 264 323 81.7


line stmt bran cond sub pod time code
1             package Mojo::Server::Daemon;
2 53     53   4991 use Mojo::Base 'Mojo::Server';
  53         141  
  53         349  
3              
4 53     53   432 use Carp qw(croak);
  53         139  
  53         2701  
5 53     53   956 use Mojo::IOLoop;
  53         159  
  53         466  
6 53     53   26641 use Mojo::Transaction::WebSocket;
  53         219  
  53         623  
7 53     53   393 use Mojo::URL;
  53         148  
  53         301  
8 53     53   306 use Mojo::Util qw(term_escape);
  53         181  
  53         2579  
9 53     53   337 use Mojo::WebSocket qw(server_handshake);
  53         155  
  53         2212  
10 53     53   367 use Scalar::Util qw(weaken);
  53         185  
  53         3368  
11              
12 53   50 53   360 use constant DEBUG => $ENV{MOJO_SERVER_DEBUG} || 0;
  53         153  
  53         184305  
13              
14             has acceptors => sub { [] };
15             has [qw(backlog max_clients silent)];
16             has inactivity_timeout => sub { $ENV{MOJO_INACTIVITY_TIMEOUT} // 30 };
17             has ioloop => sub { Mojo::IOLoop->singleton };
18             has keep_alive_timeout => sub { $ENV{MOJO_KEEP_ALIVE_TIMEOUT} // 5 };
19             has listen => sub { [split /,/, $ENV{MOJO_LISTEN} || 'http://*:3000'] };
20             has max_requests => 100;
21              
22             sub DESTROY {
23 130     130   10705 my $self = shift;
24 130 50       4772 return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
25 130         493 my $loop = $self->ioloop;
26 130   100     298 $loop->remove($_) for keys %{$self->{connections} // {}}, @{$self->acceptors};
  130         946  
  130         545  
27             }
28              
29 132     132 1 401 sub ports { [map { $_[0]->ioloop->acceptor($_)->port } @{$_[0]->acceptors}] }
  132         482  
  132         468  
30              
31             sub run {
32 0     0 1 0 my $self = shift;
33              
34             # Make sure the event loop can be stopped in regular intervals
35 0         0 my $loop = $self->ioloop;
36 0     0   0 my $int = $loop->recurring(1 => sub { });
37 0     0   0 local $SIG{INT} = local $SIG{TERM} = sub { $loop->stop };
  0         0  
38 0         0 $self->start->ioloop->start;
39 0         0 $loop->remove($int);
40             }
41              
42             sub start {
43 138     138 1 334 my $self = shift;
44              
45 138         474 my $loop = $self->ioloop;
46 138 100       529 if (my $max = $self->max_clients) { $loop->max_connections($max) }
  4         30  
47              
48             # Resume accepting connections
49 138 100       495 if (my $servers = $self->{servers}) {
    100          
50 1         17 push @{$self->acceptors}, $loop->acceptor(delete $servers->{$_}) for keys %$servers;
  1         3  
51             }
52              
53             # Start listening
54 137         501 elsif (!@{$self->acceptors}) {
55 136         458 $self->app->server($self);
56 136         289 $self->_listen($_) for @{$self->listen};
  136         548  
57             }
58              
59 136         724 return $self;
60             }
61              
62             sub stop {
63 1     1 1 4 my $self = shift;
64              
65             # Suspend accepting connections but keep listen sockets open
66 1         5 my $loop = $self->ioloop;
67 1         5 while (my $id = shift @{$self->acceptors}) {
  2         6  
68 1         4 my $server = $self->{servers}{$id} = $loop->acceptor($id);
69 1         6 $loop->remove($id);
70 1         5 $server->stop;
71             }
72              
73 1         13 return $self;
74             }
75              
76             sub _build_tx {
77 936     936   2330 my ($self, $id, $c) = @_;
78              
79 936         4062 my $tx = $self->build_tx->connection($id);
80 936         3363 $tx->res->headers->server('Mojolicious (Perl)');
81 936         3460 my $handle = $self->ioloop->stream($id)->timeout($self->inactivity_timeout)->handle;
82 936 50       6618 unless ($handle->isa('IO::Socket::UNIX')) {
83 936         3971 $tx->local_address($handle->sockhost)->local_port($handle->sockport);
84 936         4142 $tx->remote_address($handle->peerhost)->remote_port($handle->peerport);
85             }
86 936 50       3871 $tx->req->url->base->scheme('https') if $c->{tls};
87              
88 936         3506 weaken $self;
89             $tx->on(
90             request => sub {
91 936     936   2045 my $tx = shift;
92              
93 936         2896 my $req = $tx->req;
94 936 100       2953 if (my $error = $req->error) { $self->_trace($id, $error->{message}) }
  1         14  
95              
96             # WebSocket
97 936 100       2940 if ($req->is_handshake) {
98 72         591 my $ws = $self->{connections}{$id}{next} = Mojo::Transaction::WebSocket->new(handshake => $tx);
99 72         321 $self->emit(request => server_handshake $ws);
100             }
101              
102             # HTTP
103 864         3554 else { $self->emit(request => $tx) }
104              
105             # Last keep-alive request or corrupted connection
106 936         3178 my $c = $self->{connections}{$id};
107 936 100 100     5022 $tx->res->headers->connection('close') if ($c->{requests} || 1) >= $self->max_requests || $req->error;
      100        
108              
109 936         6336 $tx->on(resume => sub { $self->_write($id) });
  94         387  
110 936         3565 $self->_write($id);
111             }
112 936         7746 );
113              
114             # Kept alive if we have more than one request on the connection
115 936 100       5061 return ++$c->{requests} > 1 ? $tx->kept_alive(1) : $tx;
116             }
117              
118             sub _close {
119 237     237   659 my ($self, $id) = @_;
120 237 100       910 if (my $tx = $self->{connections}{$id}{tx}) { $tx->closed }
  68         301  
121 237         5481 delete $self->{connections}{$id};
122             }
123              
124 4 100   4   73 sub _trace { $_[0]->app->log->trace($_[2]) if $_[0]{connections}{$_[1]}{tx} }
125              
126             sub _finish {
127 1030     1030   2671 my ($self, $id) = @_;
128              
129             # Always remove connection for WebSockets
130 1030         2531 my $c = $self->{connections}{$id};
131 1030 100       3172 return unless my $tx = $c->{tx};
132 974 100       3923 return $self->_remove($id) if $tx->is_websocket;
133              
134             # Finish transaction
135 928         3690 delete($c->{tx})->closed;
136              
137             # Upgrade connection to WebSocket
138 928 100       3024 if (my $ws = delete $c->{next}) {
139              
140             # Successful upgrade
141 72 100       284 if ($ws->handshake->res->code == 101) {
142 62         266 $c->{tx} = $ws->established(1);
143 62         261 weaken $self;
144 62     246   451 $ws->on(resume => sub { $self->_write($id) });
  246         733  
145 62         185 $self->_write($id);
146             }
147              
148             # Failed upgrade
149 10         54 else { $ws->closed }
150             }
151              
152             # Close connection if necessary
153 928 100 100     2865 return $self->_remove($id) if $tx->error || !$tx->keep_alive;
154              
155             # Build new transaction for leftovers
156 901 100       2738 if (length(my $leftovers = $tx->req->content->leftovers)) {
157 1         7 $tx = $c->{tx} = $self->_build_tx($id, $c);
158 1         10 $tx->server_read($leftovers);
159             }
160              
161             # Keep-alive connection
162 901 100       4045 $self->ioloop->stream($id)->timeout($self->keep_alive_timeout) unless $c->{tx};
163             }
164              
165             sub _listen {
166 136     136   417 my ($self, $listen) = @_;
167              
168 136         645 my $url = Mojo::URL->new($listen);
169 136         568 my $proto = $url->protocol;
170 136 100 100     902 croak qq{Invalid listen location "$listen"} unless $proto eq 'http' || $proto eq 'https' || $proto eq 'http+unix';
      66        
171              
172 135         644 my $query = $url->query;
173 135         718 my $options = {backlog => $self->backlog};
174 135         672 $options->{$_} = $query->param($_) for qw(fd single_accept reuse);
175 135 50       549 if ($proto eq 'http+unix') { $options->{path} = $url->host }
  0         0  
176             else {
177 135 50       521 if ((my $host = $url->host) ne '*') { $options->{address} = $host }
  135         424  
178 135 50       507 if (my $port = $url->port) { $options->{port} = $port }
  0         0  
179             }
180              
181 135         453 $options->{tls_ca} = $query->param('ca');
182 135   33     307 /^(.*)_(cert|key)$/ and $options->{"tls_$2"}{$1} = $query->param($_) for @{$query->names};
  135         470  
183 135 50       518 if (my $cert = $query->param('cert')) { $options->{tls_cert}{''} = $cert }
  0         0  
184 135 50       461 if (my $key = $query->param('key')) { $options->{tls_key}{''} = $key }
  0         0  
185 135         423 my ($ciphers, $verify, $version) = ($query->param('ciphers'), $query->param('verify'), $query->param('version'));
186 135 50       529 $options->{tls_options}{SSL_cipher_list} = $ciphers if defined $ciphers;
187 135 50       433 $options->{tls_options}{SSL_verify_mode} = hex $verify if defined $verify;
188 135 50       372 $options->{tls_options}{SSL_version} = $version if defined $version;
189 135         524 my $tls = $options->{tls} = $proto eq 'https';
190              
191 135         620 weaken $self;
192 135         503 push @{$self->acceptors}, $self->ioloop->server(
193             $options => sub {
194 180     180   628 my ($loop, $stream, $id) = @_;
195              
196 180         931 $self->{connections}{$id} = {tls => $tls};
197 180         381 warn "-- Accept $id (@{[_peer($stream->handle)]})\n" if DEBUG;
198 180         793 $stream->timeout($self->inactivity_timeout);
199              
200 180 50       1584 $stream->on(close => sub { $self && $self->_close($id) });
  164         1037  
201 180 0 0     1316 $stream->on(error => sub { $self && $self->app->log->error(pop) && $self->_close($id) });
  0         0  
202 180         1208 $stream->on(read => sub { $self->_read($id => pop) });
  1125         4526  
203 180         1097 $stream->on(timeout => sub { $self->_trace($id, 'Inactivity timeout (see FAQ for more)') });
  3         36  
204             }
205 135         241 );
206              
207 134 50       664 return if $self->silent;
208 0         0 $self->app->log->info(qq{Listening at "$url"});
209 0         0 $query->pairs([]);
210 0 0       0 $url->host('127.0.0.1') if $url->host eq '*';
211 0 0 0     0 $url->port($self->ports->[-1]) if !$options->{path} && !$url->port;
212 0   0     0 say 'Web application available at ', $options->{path} // $url;
213             }
214              
215 0 0   0   0 sub _peer { $_[0]->isa('IO::Socket::UNIX') ? $_[0]->peerpath : $_[0]->peerhost }
216              
217             sub _read {
218 1125     1125   3144 my ($self, $id, $chunk) = @_;
219              
220             # Make sure we have a transaction
221 1125         2607 my $c = $self->{connections}{$id};
222 1125   66     5638 my $tx = $c->{tx} ||= $self->_build_tx($id, $c);
223 1125         1909 warn term_escape "-- Server <<< Client (@{[_url($tx)]})\n$chunk\n" if DEBUG;
224 1125         4275 $tx->server_read($chunk);
225             }
226              
227             sub _remove {
228 73     73   266 my ($self, $id) = @_;
229 73         247 $self->ioloop->remove($id);
230 73         372 $self->_close($id);
231             }
232              
233 0     0   0 sub _url { shift->req->url->to_abs }
234              
235             sub _write {
236 2316     2316   5298 my ($self, $id) = @_;
237              
238             # Protect from resume event recursion
239 2316         5630 my $c = $self->{connections}{$id};
240 2316 100 100     11179 return if !(my $tx = $c->{tx}) || $c->{writing};
241 2274         5540 local $c->{writing} = 1;
242 2274         7700 my $chunk = $tx->server_write;
243 2274         3654 warn term_escape "-- Server >>> Client (@{[_url($tx)]})\n$chunk\n" if DEBUG;
244 2274 100       7229 my $next = $tx->is_finished ? '_finish' : length $chunk ? '_write' : undef;
    100          
245 2274 100       5681 return $self->ioloop->stream($id)->write($chunk) unless $next;
246 2034         6964 weaken $self;
247 2034     2008   5696 $self->ioloop->stream($id)->write($chunk => sub { $self->$next($id) });
  2008         8570  
248             }
249              
250             1;
251              
252             =encoding utf8
253              
254             =head1 NAME
255              
256             Mojo::Server::Daemon - Non-blocking I/O HTTP and WebSocket server
257              
258             =head1 SYNOPSIS
259              
260             use Mojo::Server::Daemon;
261              
262             my $daemon = Mojo::Server::Daemon->new(listen => ['http://*:8080']);
263             $daemon->unsubscribe('request')->on(request => sub ($daemon, $tx) {
264              
265             # Request
266             my $method = $tx->req->method;
267             my $path = $tx->req->url->path;
268              
269             # Response
270             $tx->res->code(200);
271             $tx->res->headers->content_type('text/plain');
272             $tx->res->body("$method request for $path!");
273              
274             # Resume transaction
275             $tx->resume;
276             });
277             $daemon->run;
278              
279             =head1 DESCRIPTION
280              
281             L is a full featured, highly portable non-blocking I/O HTTP and WebSocket server, with IPv6, TLS,
282             SNI, Comet (long polling), keep-alive and multiple event loop support.
283              
284             For better scalability (epoll, kqueue) and to provide non-blocking name resolution, SOCKS5 as well as TLS support, the
285             optional modules L (4.32+), L (0.15+), L (0.64+) and L
286             (2.009+) will be used automatically if possible. Individual features can also be disabled with the C,
287             C and C environment variables.
288              
289             See L for more.
290              
291             =head1 SIGNALS
292              
293             The L process can be controlled at runtime with the following signals.
294              
295             =head2 INT, TERM
296              
297             Shut down server immediately.
298              
299             =head1 EVENTS
300              
301             L inherits all events from L.
302              
303             =head1 ATTRIBUTES
304              
305             L inherits all attributes from L and implements the following new ones.
306              
307             =head2 acceptors
308              
309             my $acceptors = $daemon->acceptors;
310             $daemon = $daemon->acceptors(['6be0c140ef00a389c5d039536b56d139']);
311              
312             Active acceptor ids.
313              
314             # Check port
315             mu $port = $daemon->ioloop->acceptor($daemon->acceptors->[0])->port;
316              
317             =head2 backlog
318              
319             my $backlog = $daemon->backlog;
320             $daemon = $daemon->backlog(128);
321              
322             Listen backlog size, defaults to C.
323              
324             =head2 inactivity_timeout
325              
326             my $timeout = $daemon->inactivity_timeout;
327             $daemon = $daemon->inactivity_timeout(5);
328              
329             Maximum amount of time in seconds a connection with an active request can be inactive before getting closed, defaults
330             to the value of the C environment variable or C<30>. Setting the value to C<0> will allow
331             connections to be inactive indefinitely.
332              
333             =head2 ioloop
334              
335             my $loop = $daemon->ioloop;
336             $daemon = $daemon->ioloop(Mojo::IOLoop->new);
337              
338             Event loop object to use for I/O operations, defaults to the global L singleton.
339              
340             =head2 keep_alive_timeout
341              
342             my $timeout = $daemon->keep_alive_timeout;
343             $daemon = $daemon->keep_alive_timeout(10);
344              
345             Maximum amount of time in seconds a connection without an active request can be inactive before getting closed,
346             defaults to the value of the C environment variable or C<5>. Setting the value to C<0> will
347             allow connections to be inactive indefinitely.
348              
349             =head2 listen
350              
351             my $listen = $daemon->listen;
352             $daemon = $daemon->listen(['https://127.0.0.1:8080']);
353              
354             Array reference with one or more locations to listen on, defaults to the value of the C environment
355             variable or C (shortcut for C).
356              
357             # Listen on all IPv4 interfaces
358             $daemon->listen(['http://*:3000']);
359              
360             # Listen on all IPv4 and IPv6 interfaces
361             $daemon->listen(['http://[::]:3000']);
362              
363             # Listen on IPv6 interface
364             $daemon->listen(['http://[::1]:4000']);
365              
366             # Listen on IPv4 and IPv6 interfaces
367             $daemon->listen(['http://127.0.0.1:3000', 'http://[::1]:3000']);
368              
369             # Listen on UNIX domain socket "/tmp/myapp.sock" (percent encoded slash)
370             $daemon->listen(['http+unix://%2Ftmp%2Fmyapp.sock']);
371              
372             # File descriptor, as used by systemd
373             $daemon->listen(['http://127.0.0.1?fd=3']);
374              
375             # Allow multiple servers to use the same port (SO_REUSEPORT)
376             $daemon->listen(['http://*:8080?reuse=1']);
377              
378             # Listen on two ports with HTTP and HTTPS at the same time
379             $daemon->listen(['http://*:3000', 'https://*:4000']);
380              
381             # Use a custom certificate and key
382             $daemon->listen(['https://*:3000?cert=/x/server.crt&key=/y/server.key']);
383              
384             # Domain specific certificates and keys (SNI)
385             $daemon->listen(
386             ['https://*:3000?example.com_cert=/x/my.crt&example.com_key=/y/my.key']);
387              
388             # Or even a custom certificate authority
389             $daemon->listen(
390             ['https://*:3000?cert=/x/server.crt&key=/y/server.key&ca=/z/ca.crt']);
391              
392             These parameters are currently available:
393              
394             =over 2
395              
396             =item ca
397              
398             ca=/etc/tls/ca.crt
399              
400             Path to TLS certificate authority file used to verify the peer certificate.
401              
402             =item cert
403              
404             cert=/etc/tls/server.crt
405             mojolicious.org_cert=/etc/tls/mojo.crt
406              
407             Path to the TLS cert file, defaults to a built-in test certificate.
408              
409             =item ciphers
410              
411             ciphers=AES128-GCM-SHA256:RC4:HIGH:!MD5:!aNULL:!EDH
412              
413             TLS cipher specification string. For more information about the format see
414             L.
415              
416             =item fd
417              
418             fd=3
419              
420             File descriptor with an already prepared listen socket.
421              
422             =item key
423              
424             key=/etc/tls/server.key
425             mojolicious.org_key=/etc/tls/mojo.key
426              
427             Path to the TLS key file, defaults to a built-in test key.
428              
429             =item reuse
430              
431             reuse=1
432              
433             Allow multiple servers to use the same port with the C socket option.
434              
435             =item single_accept
436              
437             single_accept=1
438              
439             Only accept one connection at a time.
440              
441             =item verify
442              
443             verify=0x00
444              
445             TLS verification mode.
446              
447             =item version
448              
449             version=TLSv1_2
450              
451             TLS protocol version.
452              
453             =back
454              
455             =head2 max_clients
456              
457             my $max = $daemon->max_clients;
458             $daemon = $daemon->max_clients(100);
459              
460             Maximum number of accepted connections this server is allowed to handle concurrently, before stopping to accept new
461             incoming connections, passed along to L.
462              
463             =head2 max_requests
464              
465             my $max = $daemon->max_requests;
466             $daemon = $daemon->max_requests(250);
467              
468             Maximum number of keep-alive requests per connection, defaults to C<100>.
469              
470             =head2 silent
471              
472             my $bool = $daemon->silent;
473             $daemon = $daemon->silent($bool);
474              
475             Disable console messages.
476              
477             =head1 METHODS
478              
479             L inherits all methods from L and implements the following new ones.
480              
481             =head2 ports
482              
483             my $ports = $daemon->ports;
484              
485             Get all ports this server is currently listening on.
486              
487             # All ports
488             say for @{$daemon->ports};
489              
490             =head2 run
491              
492             $daemon->run;
493              
494             Run server and wait for L.
495              
496             =head2 start
497              
498             $daemon = $daemon->start;
499              
500             Start or resume accepting connections through L.
501              
502             # Listen on random port
503             my $port = $daemon->listen(['http://127.0.0.1'])->start->ports->[0];
504              
505             # Run multiple web servers concurrently
506             my $daemon1 = Mojo::Server::Daemon->new(listen => ['http://*:3000'])->start;
507             my $daemon2 = Mojo::Server::Daemon->new(listen => ['http://*:4000'])->start;
508             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
509              
510             =head2 stop
511              
512             $daemon = $daemon->stop;
513              
514             Stop accepting connections through L.
515              
516             =head1 DEBUGGING
517              
518             You can set the C environment variable to get some advanced diagnostics information printed to
519             C.
520              
521             MOJO_SERVER_DEBUG=1
522              
523             =head1 SEE ALSO
524              
525             L, L, L.
526              
527             =cut