File Coverage

blib/lib/Plack/Handler/Gazelle.pm
Criterion Covered Total %
statement 166 184 90.2
branch 54 80 67.5
condition 19 30 63.3
subroutine 28 28 100.0
pod 0 3 0.0
total 267 325 82.1


line stmt bran cond sub pod time code
1             package Plack::Handler::Gazelle;
2              
3 205     205   212944877 use 5.008001;
  205         2559  
4 205     205   2139 use strict;
  205         1575  
  205         12928  
5 205     205   2025 use warnings;
  205         1109  
  205         20590  
6 205     205   2657 use IO::Socket::INET;
  205         20624  
  205         14276  
7 205     205   252639 use Plack::Util;
  205         4792  
  205         7556  
8 205     205   86499 use Stream::Buffered;
  205         1218880  
  205         8008  
9 205     205   2320 use POSIX qw(EINTR EAGAIN EWOULDBLOCK);
  205         7441  
  205         7600  
10 205     205   44462 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  205         554  
  205         14468  
11 205     205   94628 use Parallel::Prefork;
  205         986713  
  205         4425  
12 205     205   105669 use Server::Starter ();
  205         1205701  
  205         5167  
13 205     205   85970 use Guard;
  205         119903  
  205         13729  
14              
15             our $VERSION = "0.49";
16              
17 205     205   1599 use XSLoader;
  205         5542  
  205         9829  
18             XSLoader::load(__PACKAGE__, $VERSION);
19              
20 205     205   1224 use constant MAX_REQUEST_SIZE => 131072;
  205         4993  
  205         33172  
21 205     205   1285 use constant CHUNKSIZE => 64 * 1024;
  205         366  
  205         193602  
22              
23 205     205   1454 my $null_io = do { open my $io, "<", \""; $io };
  205         330  
  205         5928  
24             my $bad_response = [ 400, [ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Bad Request' ] ];
25              
26             sub new {
27 204     204 0 9881 my($class, %args) = @_;
28              
29             # setup before instantiation
30 204 50       1410 if ($args{listen_sock}) {
    50          
31 0         0 $args{host} = $args{listen_sock}->sockhost;
32 0         0 $args{port} = $args{listen_sock}->sockport;
33             }
34             elsif (defined $ENV{SERVER_STARTER_PORT}) {
35 0         0 my ($hostport, $fd) = %{Server::Starter::server_ports()};
  0         0  
36 0 0       0 if ($hostport =~ /(.*):(\d+)/) {
37 0         0 $args{host} = $1;
38 0         0 $args{port} = $2;
39             } else {
40 0         0 $args{port} = $hostport;
41             }
42 0 0       0 $args{listen_sock} = IO::Socket::INET->new(
43             Proto => 'tcp',
44             ) or die "failed to create socket:$!";
45 0 0       0 $args{listen_sock}->fdopen($fd, 'w')
46             or die "failed to bind to listening socket:$!";
47             }
48              
49 204         410 my $max_workers = 10;
50 204         496 for (qw(max_workers workers)) {
51             $max_workers = delete $args{$_}
52 408 100       1126 if defined $args{$_};
53             }
54              
55 204 100       602 if ($args{child_exit}) {
56 2 50       116 $args{child_exit} = eval $args{child_exit} unless ref($args{child_exit});
57 2 50       10 die "child_exit is defined but not a code block" if ref($args{child_exit}) ne 'CODE';
58             }
59              
60             my $self = bless {
61             server_software => $args{server_software} || $class,
62       202     server_ready => $args{server_ready} || sub {},
63             listen_sock => $args{listen_sock},
64             host => $args{host} || 0,
65             port => $args{port} || 8080,
66             timeout => $args{timeout} || 300,
67             max_workers => $max_workers,
68       183     child_exit => $args{child_exit} || sub {},
69             min_reqs_per_child => (
70             defined $args{min_reqs_per_child}
71             ? $args{min_reqs_per_child} : undef,
72             ),
73             max_reqs_per_child => (
74             $args{max_reqs_per_child} || $args{max_requests} || 1000,
75             ),
76             spawn_interval => $args{spawn_interval} || 0,
77             err_respawn_interval => (
78             defined $args{err_respawn_interval}
79             ? $args{err_respawn_interval} : undef,
80 204 50 33     9266 ),
    50 100        
      100        
      50        
      50        
      100        
      50        
      50        
81             }, $class;
82              
83 204         1747 $self;
84             }
85              
86             sub setup_listener {
87 204     204 0 360 my $self = shift;
88             $self->{listen_sock} ||= IO::Socket::INET->new(
89             Listen => SOMAXCONN,
90             LocalPort => $self->{port},
91             LocalAddr => $self->{host},
92 204 50 33     4765 Proto => 'tcp',
93             ReuseAddr => 1,
94             ) or die "failed to listen to port $self->{port}:$!";
95              
96 204         117364 my $family = Socket::sockaddr_family(getsockname($self->{listen_sock}));
97 204         758 $self->{_listen_sock_is_tcp} = $family != AF_UNIX;
98              
99             # set defer accept
100 204 50 33     1452 if ($^O eq 'linux' && $self->{_listen_sock_is_tcp}) {
101 204         1495 setsockopt($self->{listen_sock}, IPPROTO_TCP, 9, 1);
102             }
103 204         938 $self->{server_ready}->($self);
104             }
105              
106              
107             sub run {
108 204     204 0 9628 my($self, $app) = @_;
109 204         820 $self->setup_listener();
110             # use Parallel::Prefork
111             my %pm_args = (
112             max_workers => $self->{max_workers},
113 204         972 trap_signals => {
114             HUP => 'TERM',
115             },
116             );
117 204 50       598 if (defined $self->{spawn_interval}) {
118 204         543 $pm_args{trap_signals}{USR1} = [ 'TERM', $self->{spawn_interval} ];
119 204         439 $pm_args{spawn_interval} = $self->{spawn_interval};
120             }
121 204 50       641 if (defined $self->{err_respawn_interval}) {
122 0         0 $pm_args{err_respawn_interval} = $self->{err_respawn_interval};
123             }
124 204         1379 my $pm = Parallel::Prefork->new(\%pm_args);
125              
126             local $SIG{TERM} = sub {
127             #tell the socket we're done reading (stops new connections, existing will continue)
128             $self->{listen_sock}->shutdown(0)
129 19 50   19   4467172 if not defined $ENV{SERVER_STARTER_PORT};
130              
131 19         3456 $pm->signal_received('TERM');
132 19         585 $pm->signal_all_children('TERM');
133 204         17680 };
134 204         964 while ($pm->signal_received !~ /^(TERM|USR1)$/) {
135             $pm->start(sub{
136 185     185   13886357 srand((rand() * 2 ** 30) ^ $$ ^ time);
137              
138             my $max_reqs_per_child = $self->_calc_minmax_per_child(
139             $self->{max_reqs_per_child},
140             $self->{min_reqs_per_child}
141 185         9062 );
142              
143 185         1843 my $proc_req_count = 0;
144 185         3439 $self->{term_received} = 0;
145             local $SIG{TERM} = sub {
146 161         7968 $self->{term_received}++;
147 185         11598 };
148 185         4435 local $SIG{PIPE} = 'IGNORE';
149             PROC_LOOP:
150 185         2801 while ( $proc_req_count < $max_reqs_per_child) {
151 619 100       4354 if ( $self->{term_received} ) {
152 161         3192 $self->{child_exit}->($self, $app);
153 161         36303 exit 0;
154             }
155 458 100 100     28869646 if ( my ($conn, $buf, $env) = accept_psgi(
      50        
156             fileno($self->{listen_sock}), $self->{timeout}, $self->{_listen_sock_is_tcp},
157             $self->{host} || 0, $self->{port} || 0
158             ) ) {
159 147         4540 my $guard = guard { close_client($conn) };
  147         1013605  
160 147         888 ++$proc_req_count;
161              
162 147         756 my $res = $bad_response;
163 205     205   1515 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
  205         564  
  205         208807  
  147         543  
  147         1197  
164 147 100       1479 if (my $cl = $env->{CONTENT_LENGTH}) {
    100          
165 9         543 my $buffer = Stream::Buffered->new($cl);
166 9         850 while ($cl > 0) {
167 29         117 my $chunk = "";
168 29 100       149 if (length $buf) {
169 1         7 $chunk = $buf;
170 1         8 $buf = '';
171             } else {
172             read_timeout(
173             $conn, \$chunk, $cl, 0, $self->{timeout})
174 28 50       681 or next PROC_LOOP;
175             }
176 29         206 $buffer->print($chunk);
177 29         1155 $cl -= length $chunk;
178             }
179 9         87 $env->{'psgi.input'} = $buffer->rewind;
180             } elsif ( $chunked ) {
181 1         52 my $buffer = Stream::Buffered->new($cl);
182 1         154 my $chunk_buffer = '';
183 1         4 my $length;
184 1         7 DECHUNK: while(1) {
185 10         21 my $chunk = "";
186 10 50       31 if ( length $buf ) {
187 0         0 $chunk = $buf;
188 0         0 $buf = '';
189             }
190             else {
191             read_timeout(
192             $conn, \$chunk, 16384, 0, $self->{timeout})
193 10 50       505 or next PROC_LOOP;
194             }
195              
196 10         76 $chunk_buffer .= $chunk;
197 10         114 while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
198 16         60 my $trailer = $1;
199 16         40 my $chunk_len = hex $2;
200 16 100       37 if ($chunk_len == 0) {
    50          
201 1         3 last DECHUNK;
202             } elsif (length $chunk_buffer < $chunk_len + 2) {
203 0         0 $chunk_buffer = $trailer . $chunk_buffer;
204 0         0 last;
205             }
206 15         61 $buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
207 15         264 $chunk_buffer =~ s/^\015\012//;
208 15         37 $length += $chunk_len;
209             }
210             }
211 1         12 $env->{CONTENT_LENGTH} = $length;
212 1         7 $env->{'psgi.input'} = $buffer->rewind;
213             } else {
214 137         1579 $env->{'psgi.input'} = $null_io;
215             }
216             $env->{'psgix.informational'} = sub {
217 1         55 my ($status,$headers) = @_;
218 1         77 write_informational_response($conn, $self->{timeout}, $status, $headers);
219 147         4563 };
220 147         3284 $res = Plack::Util::run_app $app, $env;
221 146 100       42376 my $use_chunked = $env->{"SERVER_PROTOCOL"} eq 'HTTP/1.1' ? 1 : 0;
222 146 100       1310 if (ref $res eq 'ARRAY') {
    50          
223 134         1244 $self->_handle_response($res, $conn, $use_chunked);
224             } elsif (ref $res eq 'CODE') {
225             $res->(sub {
226 12         640 $self->_handle_response($_[0], $conn, $use_chunked);
227 12         232 });
228             } else {
229 0         0 die "Bad response $res";
230             }
231 146 100       6049 if ($env->{'psgix.harakiri.commit'}) {
232 23         139 $self->{child_exit}->($self, $app);
233 23         2564 exit 0;
234             }
235             }
236             }
237 204         3242 });
238             }
239 19         5530 while ($pm->wait_all_children(1)) {
240 0         0 $pm->signal_all_children('TERM');
241             }
242             }
243              
244             sub _calc_minmax_per_child {
245 185     185   2291 my $self = shift;
246 185         2047 my ($max,$min) = @_;
247 185 50       3917 if (defined $min) {
248 0         0 return $max - int(($max - $min + 1) * rand);
249             } else {
250 185         1720 return $max;
251             }
252             }
253              
254             sub _handle_response {
255 146     146   702 my($self, $res, $conn, $use_chunked) = @_;
256 146         526 my $status_code = $res->[0];
257 146         403 my $headers = $res->[1];
258 146         401 my $body = $res->[2];
259              
260 146 100 100     2441 if (defined $body && ref $body eq 'ARRAY' ) {
261 130         19193 write_psgi_response($conn, $self->{timeout}, $status_code, $headers , $body, $use_chunked);
262 130         702 return;
263             }
264              
265 16 50       2640 write_psgi_response_header($conn, $self->{timeout}, $status_code, $headers, [], $use_chunked) or return;
266              
267 16 100       147 if (defined $body) {
268 10         41 my $failed;
269             Plack::Util::foreach(
270             $body,
271             sub {
272 16 50   16   2690 return if $failed;
273 16         38 my $ret;
274 16 100       116 if ( $use_chunked ) {
275 5         274 $ret = write_chunk($conn, $_[0], 0, $self->{timeout});
276             }
277             else {
278 11         721 $ret = write_all($conn, $_[0], 0, $self->{timeout});
279             }
280 16 50       443 $failed = 1 if ! defined $ret;
281             },
282 10         333 );
283 10 100       7690 write_all($conn, "0\015\012\015\012", 0, $self->{timeout}) if $use_chunked;
284             } else {
285             return Plack::Util::inline_object
286             write => sub {
287 14 100   14   822 if ( $use_chunked ) {
288 7         181 write_chunk($conn, $_[0], 0, $self->{timeout});
289             }
290             else {
291 7         193 write_all($conn, $_[0], 0, $self->{timeout});
292             }
293             },
294             close => sub {
295 6 100   6   598 write_all($conn, "0\015\012\015\012", 0, $self->{timeout}) if $use_chunked;
296 6         275 };
297             }
298             }
299              
300             1;