File Coverage

blib/lib/Plack/Handler/Gazelle.pm
Criterion Covered Total %
statement 168 184 91.3
branch 55 80 68.7
condition 19 30 63.3
subroutine 28 28 100.0
pod 0 3 0.0
total 270 325 83.0


line stmt bran cond sub pod time code
1             package Plack::Handler::Gazelle;
2              
3 205     205   255558672 use 5.008001;
  205         2598  
4 205     205   2438 use strict;
  205         891  
  205         13037  
5 205     205   1902 use warnings;
  205         2268  
  205         20075  
6 205     205   2944 use IO::Socket::INET;
  205         35278  
  205         11648  
7 205     205   227393 use Plack::Util;
  205         3695  
  205         7232  
8 205     205   80940 use Stream::Buffered;
  205         1273871  
  205         8025  
9 205     205   1999 use POSIX qw(EINTR EAGAIN EWOULDBLOCK);
  205         5377  
  205         6345  
10 205     205   41264 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  205         431  
  205         12370  
11 205     205   181565 use Parallel::Prefork;
  205         1186538  
  205         4376  
12 205     205   97166 use Server::Starter ();
  205         1107141  
  205         4283  
13 205     205   78244 use Guard;
  205         107472  
  205         11971  
14              
15             our $VERSION = "0.48";
16              
17 205     205   1338 use XSLoader;
  205         17146  
  205         9068  
18             XSLoader::load(__PACKAGE__, $VERSION);
19              
20 205     205   1180 use constant MAX_REQUEST_SIZE => 131072;
  205         2569  
  205         30924  
21 205     205   1205 use constant CHUNKSIZE => 64 * 1024;
  205         304  
  205         180262  
22              
23 205     205   1461 my $null_io = do { open my $io, "<", \""; $io };
  205         422  
  205         5204  
24             my $bad_response = [ 400, [ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Bad Request' ] ];
25              
26             sub new {
27 204     204 0 9620 my($class, %args) = @_;
28              
29             # setup before instantiation
30 204 50       2125 if ($args{listen_sock}) {
    50          
31 0         0 $args{host} = $args{listen_sock}->sockhost;
32 0         0 $args{port} = $args{listen_sock}->sockport;
33             }
34             elsif (defined $ENV{SERVER_STARTER_PORT}) {
35 0         0 my ($hostport, $fd) = %{Server::Starter::server_ports()};
  0         0  
36 0 0       0 if ($hostport =~ /(.*):(\d+)/) {
37 0         0 $args{host} = $1;
38 0         0 $args{port} = $2;
39             } else {
40 0         0 $args{port} = $hostport;
41             }
42 0 0       0 $args{listen_sock} = IO::Socket::INET->new(
43             Proto => 'tcp',
44             ) or die "failed to create socket:$!";
45 0 0       0 $args{listen_sock}->fdopen($fd, 'w')
46             or die "failed to bind to listening socket:$!";
47             }
48              
49 204         451 my $max_workers = 10;
50 204         441 for (qw(max_workers workers)) {
51             $max_workers = delete $args{$_}
52 408 100       1102 if defined $args{$_};
53             }
54              
55 204 100       545 if ($args{child_exit}) {
56 2 50       130 $args{child_exit} = eval $args{child_exit} unless ref($args{child_exit});
57 2 50       10 die "child_exit is defined but not a code block" if ref($args{child_exit}) ne 'CODE';
58             }
59              
60             my $self = bless {
61             server_software => $args{server_software} || $class,
62       202     server_ready => $args{server_ready} || sub {},
63             listen_sock => $args{listen_sock},
64             host => $args{host} || 0,
65             port => $args{port} || 8080,
66             timeout => $args{timeout} || 300,
67             max_workers => $max_workers,
68       183     child_exit => $args{child_exit} || sub {},
69             min_reqs_per_child => (
70             defined $args{min_reqs_per_child}
71             ? $args{min_reqs_per_child} : undef,
72             ),
73             max_reqs_per_child => (
74             $args{max_reqs_per_child} || $args{max_requests} || 1000,
75             ),
76             spawn_interval => $args{spawn_interval} || 0,
77             err_respawn_interval => (
78             defined $args{err_respawn_interval}
79             ? $args{err_respawn_interval} : undef,
80 204 50 33     9289 ),
    50 100        
      100        
      50        
      50        
      100        
      50        
      50        
81             }, $class;
82              
83 204         1618 $self;
84             }
85              
86             sub setup_listener {
87 204     204 0 397 my $self = shift;
88             $self->{listen_sock} ||= IO::Socket::INET->new(
89             Listen => SOMAXCONN,
90             LocalPort => $self->{port},
91             LocalAddr => $self->{host},
92 204 50 33     4870 Proto => 'tcp',
93             ReuseAddr => 1,
94             ) or die "failed to listen to port $self->{port}:$!";
95              
96 204         119501 my $family = Socket::sockaddr_family(getsockname($self->{listen_sock}));
97 204         758 $self->{_listen_sock_is_tcp} = $family != AF_UNIX;
98              
99             # set defer accept
100 204 50 33     1522 if ($^O eq 'linux' && $self->{_listen_sock_is_tcp}) {
101 204         1447 setsockopt($self->{listen_sock}, IPPROTO_TCP, 9, 1);
102             }
103 204         837 $self->{server_ready}->($self);
104             }
105              
106              
107             sub run {
108 204     204 0 9540 my($self, $app) = @_;
109 204         688 $self->setup_listener();
110             # use Parallel::Prefork
111             my %pm_args = (
112             max_workers => $self->{max_workers},
113 204         924 trap_signals => {
114             HUP => 'TERM',
115             },
116             );
117 204 50       601 if (defined $self->{spawn_interval}) {
118 204         619 $pm_args{trap_signals}{USR1} = [ 'TERM', $self->{spawn_interval} ];
119 204         425 $pm_args{spawn_interval} = $self->{spawn_interval};
120             }
121 204 50       544 if (defined $self->{err_respawn_interval}) {
122 0         0 $pm_args{err_respawn_interval} = $self->{err_respawn_interval};
123             }
124 204         1377 my $pm = Parallel::Prefork->new(\%pm_args);
125              
126             local $SIG{TERM} = sub {
127             #tell the socket we're done reading (stops new connections, existing will continue)
128             $self->{listen_sock}->shutdown(0)
129 19 50   19   4635084 if not defined $ENV{SERVER_STARTER_PORT};
130              
131 19         2713 $pm->signal_received('TERM');
132 19         586 $pm->signal_all_children('TERM');
133 204         17041 };
134 204         936 while ($pm->signal_received !~ /^(TERM|USR1)$/) {
135             $pm->start(sub{
136 185     185   15276341 srand((rand() * 2 ** 30) ^ $$ ^ time);
137              
138             my $max_reqs_per_child = $self->_calc_minmax_per_child(
139             $self->{max_reqs_per_child},
140             $self->{min_reqs_per_child}
141 185         8442 );
142              
143 185         1446 my $proc_req_count = 0;
144 185         3307 $self->{term_received} = 0;
145             local $SIG{TERM} = sub {
146 161         3313 $self->{term_received}++;
147 185         28217 };
148 185         3880 local $SIG{PIPE} = 'IGNORE';
149             PROC_LOOP:
150 185         2852 while ( $proc_req_count < $max_reqs_per_child) {
151 780 100       4960 if ( $self->{term_received} ) {
152 161         2789 $self->{child_exit}->($self, $app);
153 161         55288 exit 0;
154             }
155 619 100 100     30348312 if ( my ($conn, $buf, $env) = accept_psgi(
      50        
156             fileno($self->{listen_sock}), $self->{timeout}, $self->{_listen_sock_is_tcp},
157             $self->{host} || 0, $self->{port} || 0
158             ) ) {
159 147         3360 my $guard = guard { close_client($conn) };
  147         1011939  
160 147         740 ++$proc_req_count;
161              
162 147         671 my $res = $bad_response;
163 205     205   1365 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
  205         597  
  205         352277  
  147         423  
  147         878  
164 147 100       1312 if (my $cl = $env->{CONTENT_LENGTH}) {
    100          
165 9         484 my $buffer = Stream::Buffered->new($cl);
166 9         779 while ($cl > 0) {
167 32         55 my $chunk = "";
168 32 100       74 if (length $buf) {
169 1         2 $chunk = $buf;
170 1         2 $buf = '';
171             } else {
172             read_timeout(
173             $conn, \$chunk, $cl, 0, $self->{timeout})
174 31 50       1201 or next PROC_LOOP;
175             }
176 32         184 $buffer->print($chunk);
177 32         855 $cl -= length $chunk;
178             }
179 9         94 $env->{'psgi.input'} = $buffer->rewind;
180             } elsif ( $chunked ) {
181 1         46 my $buffer = Stream::Buffered->new($cl);
182 1         125 my $chunk_buffer = '';
183 1         3 my $length;
184 1         3 DECHUNK: while(1) {
185 2         8 my $chunk = "";
186 2 100       6 if ( length $buf ) {
187 1         2 $chunk = $buf;
188 1         2 $buf = '';
189             }
190             else {
191             read_timeout(
192             $conn, \$chunk, 16384, 0, $self->{timeout})
193 1 50       53 or next PROC_LOOP;
194             }
195              
196 2         39 $chunk_buffer .= $chunk;
197 2         58 while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
198 16         40 my $trailer = $1;
199 16         30 my $chunk_len = hex $2;
200 16 100       37 if ($chunk_len == 0) {
    50          
201 1         3 last DECHUNK;
202             } elsif (length $chunk_buffer < $chunk_len + 2) {
203 0         0 $chunk_buffer = $trailer . $chunk_buffer;
204 0         0 last;
205             }
206 15         55 $buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
207 15         272 $chunk_buffer =~ s/^\015\012//;
208 15         52 $length += $chunk_len;
209             }
210             }
211 1         9 $env->{CONTENT_LENGTH} = $length;
212 1         5 $env->{'psgi.input'} = $buffer->rewind;
213             } else {
214 137         1048 $env->{'psgi.input'} = $null_io;
215             }
216             $env->{'psgix.informational'} = sub {
217 1         46 my ($status,$headers) = @_;
218 1         59 write_informational_response($conn, $self->{timeout}, $status, $headers);
219 147         3310 };
220 147         2331 $res = Plack::Util::run_app $app, $env;
221 146 100       27441 my $use_chunked = $env->{"SERVER_PROTOCOL"} eq 'HTTP/1.1' ? 1 : 0;
222 146 100       1401 if (ref $res eq 'ARRAY') {
    50          
223 134         780 $self->_handle_response($res, $conn, $use_chunked);
224             } elsif (ref $res eq 'CODE') {
225             $res->(sub {
226 12         541 $self->_handle_response($_[0], $conn, $use_chunked);
227 12         166 });
228             } else {
229 0         0 die "Bad response $res";
230             }
231 146 100       5271 if ($env->{'psgix.harakiri.commit'}) {
232 23         143 $self->{child_exit}->($self, $app);
233 23         2126 exit 0;
234             }
235             }
236             }
237 204         2682 });
238             }
239 19         4917 while ($pm->wait_all_children(1)) {
240 0         0 $pm->signal_all_children('TERM');
241             }
242             }
243              
244             sub _calc_minmax_per_child {
245 185     185   2174 my $self = shift;
246 185         2070 my ($max,$min) = @_;
247 185 50       3481 if (defined $min) {
248 0         0 return $max - int(($max - $min + 1) * rand);
249             } else {
250 185         2299 return $max;
251             }
252             }
253              
254             sub _handle_response {
255 146     146   643 my($self, $res, $conn, $use_chunked) = @_;
256 146         468 my $status_code = $res->[0];
257 146         359 my $headers = $res->[1];
258 146         482 my $body = $res->[2];
259              
260 146 100 100     1767 if (defined $body && ref $body eq 'ARRAY' ) {
261 130         19006 write_psgi_response($conn, $self->{timeout}, $status_code, $headers , $body, $use_chunked);
262 130         535 return;
263             }
264              
265 16 50       1956 write_psgi_response_header($conn, $self->{timeout}, $status_code, $headers, [], $use_chunked) or return;
266              
267 16 100       91 if (defined $body) {
268 10         13 my $failed;
269             Plack::Util::foreach(
270             $body,
271             sub {
272 16 50   16   2039 return if $failed;
273 16         28 my $ret;
274 16 100       94 if ( $use_chunked ) {
275 5         167 $ret = write_chunk($conn, $_[0], 0, $self->{timeout});
276             }
277             else {
278 11         467 $ret = write_all($conn, $_[0], 0, $self->{timeout});
279             }
280 16 50       282 $failed = 1 if ! defined $ret;
281             },
282 10         175 );
283 10 100       4602 write_all($conn, "0\015\012\015\012", 0, $self->{timeout}) if $use_chunked;
284             } else {
285             return Plack::Util::inline_object
286             write => sub {
287 14 100   14   762 if ( $use_chunked ) {
288 7         162 write_chunk($conn, $_[0], 0, $self->{timeout});
289             }
290             else {
291 7         231 write_all($conn, $_[0], 0, $self->{timeout});
292             }
293             },
294             close => sub {
295 6 100   6   584 write_all($conn, "0\015\012\015\012", 0, $self->{timeout}) if $use_chunked;
296 6         268 };
297             }
298             }
299              
300             1;