File Coverage

blib/lib/Porbo/Server.pm
Criterion Covered Total %
statement 154 198 77.7
branch 26 60 43.3
condition 2 5 40.0
subroutine 33 39 84.6
pod 0 5 0.0
total 215 307 70.0


line stmt bran cond sub pod time code
1             package Porbo::Server;
2 1     1   4 use strict;
  1         1  
  1         42  
3 1     1   7 use warnings;
  1         1  
  1         115  
4              
5 1     1   5 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  1         1  
  1         443  
6              
7 1     1   550 use URI;
  1         3760  
  1         28  
8              
9 1     1   5 use AnyEvent;
  1         1  
  1         44  
10 1     1   5 use AnyEvent::Handle;
  1         1  
  1         27  
11 1     1   546 use AnyEvent::Socket;
  1         11460  
  1         112  
12              
13 1     1   454 use HTTP::Status;
  1         2699  
  1         228  
14 1     1   430 use HTTP::Date;
  1         3056  
  1         48  
15 1     1   5 use Plack::Util;
  1         1  
  1         28  
16 1     1   373 use Plack::HTTPParser qw(parse_http_request);
  1         1536  
  1         71  
17 1     1   397 use Plack::Middleware::ContentLength;
  1         2781  
  1         38  
18              
19 1     1   6 use constant DEBUG => $ENV{PORBO_DEBUG};
  1         2  
  1         1453  
20              
21 1     1   4 open my $null_io, '<', \'';
  1         2  
  1         20  
22              
23             sub new {
24 1     1 0 22 my ($class, @args) = @_;
25              
26 1         7 return bless {
27             no_delay => 1,
28             timeout => 300,
29             read_chunk_size => 4096,
30             server_software => 'Porbo',
31             @args,
32             }, $class;
33             }
34              
35             sub start_listen {
36 1     1 0 1 my ($self, $app) = @_;
37              
38 1         2 my @listen;
39              
40 1 50       10 if (ref($self->{listen}) eq 'ARRAY') {
41 0         0 push @listen, @{$self->{listen}};
  0         0  
42             }
43              
44 1 50 33     8 if ($self->{host} && $self->{port}) {
45 1 50       4 if ($self->{host} !~ /^http/) {
46 1         3 push @listen, "http://$self->{host}:$self->{port}";
47             }
48             }
49              
50 1         1 for my $listen (@listen) {
51 1 50       3 if ($listen =~ /^:(\d+)/) {
52 0         0 my $port = $1;
53 0         0 $listen = "http://127.0.0.1:$port";
54             }
55 1         1 push @{$self->{listen_guards}}, $self->_create_tcp_server($listen, $app);
  1         4  
56             }
57             }
58              
59             sub register_service {
60 1     1 0 1 my ($self, $app) = @_;
61              
62 1         2 $self->start_listen($app);
63              
64             $self->{exit_guard} = AE::cv {
65             # Make sure that we are not listening on a socket anymore, while
66             # other events are being flushed
67 1     1   33 delete $self->{listen_guards};
68 1         3372 };
69 1         145 $self->{exit_guard}->begin;
70             }
71              
72             sub _create_tcp_server {
73 1     1   2 my ($self, $listen, $app) = @_;
74              
75 1         5 my $url = URI->new($listen);
76              
77 1         5359 my $host = $url->host;
78 1         133 my $port = $url->port;
79 1 50       20 my $ssl = $url->scheme eq 'https' ? 1 : 0;
80              
81 1         30 my ($listen_host, $listen_port);
82              
83 1         8 return tcp_server $host, $port, $self->_accept_handler($app, \$listen_host, \$listen_port, $ssl),
84             $self->_accept_prepare_handler(\$listen_host, \$listen_port);
85             }
86              
87             sub _accept_prepare_handler {
88 1     1   1 my ($self, $listen_host_r, $listen_port_r) = @_;
89              
90             return sub {
91 1     1   158 my ( $fh, $host, $port ) = @_;
92 1         1 DEBUG && warn "Listening on $host:$port\n";
93 1         2 $$listen_host_r = $host;
94 1         1 $$listen_port_r = $port;
95 1 50       3 $self->{server_ready}->({
96             host => $host,
97             port => $port,
98             server_software => 'Porbo',
99             }) if $self->{server_ready};
100              
101 1   50     6 return $self->{backlog} || 0;
102 1         5 };
103             }
104              
105             sub _accept_handler {
106 1     1   2 my ($self, $app, $listen_host_r, $listen_port_r, $ssl) = @_;
107              
108 1         7 $app = Plack::Middleware::ContentLength->wrap($app);
109              
110             return sub {
111 2     2   22732 my ( $sock, $peer_host, $peer_port ) = @_;
112              
113 2         3 DEBUG && warn "$sock Accepted connection from $peer_host:$peer_port\n";
114 2 50       7 return unless $sock;
115 2         8 $self->{exit_guard}->begin;
116              
117 2 50       11 if ( $self->{no_delay} ) {
118 2 50       18 setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1)
119             or die "setsockopt(TCP_NODELAY) failed:$!";
120             }
121              
122 2         4 my %args;
123 2 50       3 if ($ssl) {
124 0         0 $args{tls} = 'accept';
125 0         0 $args{tls_ctx} = {
126             key_file => $self->{ssl_key_file},
127             cert_file => $self->{ssl_cert_file},
128             };
129             }
130              
131 2         3 my $handle; $handle = AnyEvent::Handle->new(
132             fh => $sock,
133             on_error => sub {
134 1 50       97 if ($handle) {
135 1         3 $handle->destroy;
136 1         12 $self->{exit_guard}->end;
137             }
138             },
139 2         34 %args,
140             );
141              
142             $handle->push_read(line => "\015\012\015\012", sub {
143 1         43 my ($hdl, $header) = @_;
144              
145 1 50       8 my $env = {
146             SERVER_NAME => $$listen_host_r,
147             SERVER_PORT => $$listen_port_r,
148             SCRIPT_NAME => '',
149             REMOTE_ADDR => $peer_host,
150             'psgi.version' => [ 1, 0 ],
151             'psgi.errors' => *STDERR,
152             'psgi.url_scheme' => $ssl ? 'https' : 'http',
153             'psgi.nonblocking' => Plack::Util::TRUE,
154             'psgi.streaming' => Plack::Util::TRUE,
155             'psgi.run_once' => Plack::Util::FALSE,
156             'psgi.multithread' => Plack::Util::FALSE,
157             'psgi.multiprocess' => Plack::Util::FALSE,
158             'psgi.input' => undef, # will be set by _run_app()
159             'psgix.io' => $hdl->fh,
160             'psgix.input.buffered' => Plack::Util::TRUE,
161             };
162              
163 1         23 my $reqlen = parse_http_request($header."\015\012\015\012", $env);
164 1 50       151 if ($reqlen < 0) {
165 0         0 return $self->_bad_request($handle, 0);
166             }
167              
168 1 50       1 unless ( eval {
169 1         4 $self->_run_app($app, $env, $handle);
170 0         0 1;
171             }) {
172 1         581 my $disconnected = ($@ =~ /^client disconnected/);
173 1         3 $self->_bad_request($handle, $disconnected);
174             }
175 1         37 undef $handle;
176 2         196 });
177 1         31 };
178             }
179              
180             sub _run_app {
181 1     1   2 my ($self, $app, $env, $handle) = @_;
182              
183 1 50       3 unless ($env->{'psgi.input'}) {
184 1 50       2 if ($env->{CONTENT_LENGTH}) {
185 0         0 my $body;
186             $handle->on_read(sub {
187 0     0   0 $body .= $_[0]->rbuf;
188 0         0 $_[0]->rbuf = "";
189 0 0       0 if ($env->{CONTENT_LENGTH} <= length $body) {
190 0         0 open my $input, '<', \$body;
191 0         0 $env->{'psgi.input'} = $input;
192 0         0 $self->_run_app($app, $env, $handle);
193             }
194 0         0 });
195 0         0 return;
196             } else {
197 1         2 $env->{'psgi.input'} = $null_io;
198             }
199             }
200              
201 1         13 my $res = Plack::Util::run_app $app, $env;
202              
203 1 50       139 if ( ref $res eq 'ARRAY' ) {
    50          
204 0         0 $self->_write_psgi_response($handle, $res);
205             } elsif (ref $res eq 'CODE') {
206             $res->(sub {
207 0     0   0 $self->_write_psgi_response($handle, $_[0]);
208 1         4 });
209             } else {
210 0         0 croak("Unknown response type: $res");
211             }
212             }
213              
214             sub _bad_request {
215 1     1   2 my ( $self, $handle, $disconnected ) = @_;
216              
217 1         3 my $response = [
218             400,
219             [ 'Content-Type' => 'text/plain' ],
220             [ ],
221             ];
222              
223             # if client is already gone, don't try to write to it
224 1 50       3 $response = [] if $disconnected;
225              
226 1         3 $self->_write_psgi_response($handle, $response);
227              
228 1         3 return;
229             }
230              
231             sub _format_headers {
232 0     0   0 my ( $self, $status, $headers ) = @_;
233            
234 0         0 my $hdr = sprintf "HTTP/1.0 %d %s\015\012", $status, HTTP::Status::status_message($status);
235            
236 0         0 my $i = 0;
237            
238 0         0 my @delim = ("\015\012", ": ");
239            
240 0         0 foreach my $str ( @$headers ) {
241 0         0 $hdr .= $str . $delim[++$i % 2];
242             }
243            
244 0         0 $hdr .= "\015\012";
245            
246 0         0 return \$hdr;
247             }
248              
249             sub _write_psgi_response {
250 1     1   2 my ($self, $handle, $res) = @_;
251              
252 1 50       3 if (ref $res eq 'ARRAY') {
253 1 50       3 if ( scalar @$res == 0 ) {
254             # no response
255 0         0 $handle->destroy(); $self->{exit_guard}->end;
  0         0  
256 0         0 return;
257             }
258              
259 1         3 return $self->_handle_response($res, $handle);
260             } else {
261 1     1   5 no warnings 'uninitialized';
  1         2  
  1         551  
262 0         0 warn "Unknown response type: $res";
263 0         0 return $self->_write_psgi_response($handle, [ 204, [], [] ]);
264             }
265             }
266              
267             sub _handle_response {
268 1     1   2 my($self, $res, $handle) = @_;
269            
270 1         2 my @lines = (
271 1         4 "Date: @{[HTTP::Date::time2str()]}\015\012",
272             "Server: $self->{server_software}\015\012",
273             );
274            
275             Plack::Util::header_iter($res->[1], sub {
276 1     1   10 my ($k, $v) = @_;
277 1         5 push @lines, "$k: $v\015\012";
278 1         36 });
279            
280 1         5 unshift @lines, "HTTP/1.0 $res->[0] @{[ HTTP::Status::status_message($res->[0]) ]}\015\012";
  1         4  
281 1         9 push @lines, "\015\012";
282              
283 1 50       5 $self->write_all($handle, join('', @lines), $self->{timeout})
284             or return;
285              
286 1 50       8 if (defined $res->[2]) {
287 1         5 return $self->_write_body($handle, $res->[2]);
288             } else {
289             return Plack::Util::inline_object
290 0     0   0 write => sub { $self->write_all($handle, $_[0], $self->{timeout}) },
291 0     0   0 close => sub { $handle->destroy(); $self->{exit_guard}->end },
  0         0  
292 0         0 }
293             }
294              
295             sub _write_body {
296 1     1   2 my ( $self, $handle, $body ) = @_;
297              
298 1         1 my $err;
299             my $done;
300             {
301 1         1 local $@;
  1         1  
302 1         2 eval {
303 1 50       3 if (ref $body) {
304             Plack::Util::foreach(
305             $body,
306             sub {
307 0 0   0   0 $self->write_all($handle, $_[0], $self->{timeout})
308             or die "failed to send all data\n";
309             },
310 1         7 );
311             } else {
312 0 0       0 $self->write_all($handle, $body, $self->{timeout})
313             or die "failed to send all data\n";
314             }
315 1         9 $done = 1;
316 1         8 $handle->destroy(); $self->{exit_guard}->end;
  1         29  
317             };
318 1         5 $err = $@;
319             };
320 1 50       7 unless ($done) {
321 0         0 $handle->destroy(); $self->{exit_guard}->end;
  0         0  
322 0 0       0 if ($err =~ /^failed to send all data\n/) {
323 0         0 return;
324             } else {
325 0         0 die $err;
326             }
327             }
328             }
329              
330             sub write_all {
331 1     1 0 2 my ($self, $handle, $buf, $timeout) = @_;
332 1 50       3 return 0 unless defined $buf;
333 1         12 $handle->push_write($buf);
334 1         86 return length $buf;
335             }
336              
337             sub run {
338 1     1 0 1 my $self = shift;
339 1         3 $self->register_service(@_);
340              
341 1     1   62 my $w; $w = AE::signal QUIT => sub { $self->{exit_guard}->end; undef $w };
  1         14  
  1         91  
  1         11  
342 1         8 $self->{exit_guard}->recv;
343             }
344              
345             1;