File Coverage

blib/lib/Plack/Server/AnyEvent.pm
Criterion Covered Total %
statement 66 278 23.7
branch 0 80 0.0
condition 1 51 1.9
subroutine 22 52 42.3
pod 0 3 0.0
total 89 464 19.1


line stmt bran cond sub pod time code
1             package Plack::Server::AnyEvent;
2 1     1   4 use strict;
  1         2  
  1         29  
3 1     1   4 use warnings;
  1         1  
  1         32  
4 1     1   21 use 5.008_001;
  1         3  
  1         44  
5             our $VERSION = '0.04';
6              
7 1     1   5 use Scalar::Util qw(blessed weaken);
  1         1  
  1         107  
8 1     1   755 use Try::Tiny;
  1         1328  
  1         46  
9 1     1   4 use Carp;
  1         2  
  1         44  
10              
11 1     1   902 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  1         3553  
  1         230  
12 1     1   754 use Errno qw(EAGAIN EINTR);
  1         1100  
  1         102  
13 1     1   840 use IO::Handle;
  1         6467  
  1         40  
14              
15 1     1   1425 use AnyEvent;
  1         6459  
  1         28  
16 1     1   4302 use AnyEvent::Handle;
  1         18339  
  1         34  
17 1     1   953 use AnyEvent::Socket;
  1         31205  
  1         163  
18 1     1   11 use AnyEvent::Util qw(WSAEWOULDBLOCK);
  1         3  
  1         44  
19              
20 1     1   1518 use HTTP::Status;
  1         4573  
  1         408  
21              
22 1     1   944 use Plack::HTTPParser qw(parse_http_request);
  1         6536  
  1         67  
23 1     1   943 use Plack::Util;
  1         6206  
  1         30  
24              
25 1     1   913 use Plack::Middleware::ContentLength;
  1         4196  
  1         33  
26 1     1   1054 use Plack::Middleware::Chunked;
  1         406  
  1         75  
27              
28             use constant HAS_AIO => !$ENV{PLACK_NO_SENDFILE} && try {
29             require AnyEvent::AIO;
30             require IO::AIO;
31             1;
32 1   33 1   7 };
  1         2  
  1         13  
33              
34 1     1   659 use Plack::Server::AnyEvent::Writer;
  1         3  
  1         1557  
35              
36             sub new {
37 0     0 0   my($class, @args) = @_;
38              
39 0           return bless {
40             host => undef,
41             port => undef,
42             no_delay => 1,
43             @args,
44             }, $class;
45             }
46              
47             sub register_service {
48 0     0 0   my($self, $app) = @_;
49              
50 0           $app = Plack::Middleware::ContentLength->wrap($app);
51             # $app = Plack::Middleware::Chunked->wrap($app);
52              
53 0           $self->{listen_guard} = $self->_create_tcp_server($app);
54             }
55              
56             sub _create_tcp_server {
57 0     0     my ( $self, $app ) = @_;
58              
59             return tcp_server $self->{host}, $self->{port}, $self->_accept_handler($app), sub {
60 0     0     my ( $fh, $host, $port ) = @_;
61 0           $self->{prepared_host} = $host;
62 0           $self->{prepared_port} = $port;
63 0           warn "Accepting requests at http://$host:$port/\n";
64 0           return 0;
65 0           };
66             }
67              
68             sub _accept_handler {
69 0     0     my ( $self, $app ) = @_;
70              
71             return sub {
72 0     0     my ( $sock, $peer_host, $peer_port ) = @_;
73              
74 0 0         return unless $sock;
75              
76 0           $self->{exit_guard}->begin;
77              
78 0 0         if ( $self->{no_delay} ) {
79 0 0         setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1)
80             or die "setsockopt(TCP_NODELAY) failed:$!";
81             }
82              
83 0           my $headers = "";
84              
85             my $try_parse = sub {
86 0 0         if ( $self->_try_read_headers($sock, $headers) ) {
87 0           my $env = {
88             SERVER_PORT => $self->{prepared_port},
89             SERVER_NAME => $self->{prepared_host},
90             SCRIPT_NAME => '',
91             'psgi.version' => [ 1, 0 ],
92             'psgi.errors' => *STDERR,
93             'psgi.url_scheme' => 'http',
94             'psgi.nonblocking' => Plack::Util::TRUE,
95             'psgi.streaming' => Plack::Util::TRUE,
96             'psgi.run_once' => Plack::Util::FALSE,
97             'psgi.multithread' => Plack::Util::FALSE,
98             'psgi.multiprocess' => Plack::Util::FALSE,
99             'psgi.input' => $sock,
100             'REMOTE_ADDR' => $peer_host,
101             };
102              
103 0           my $reqlen = parse_http_request($headers, $env);
104              
105 0 0         if ( $reqlen < 0 ) {
106 0           die "bad request";
107             } else {
108 0           return $env;
109             }
110             }
111              
112 0           return;
113 0           };
114              
115 0           local $@;
116 0 0         unless ( eval {
117 0 0         if ( my $env = $try_parse->() ) {
118             # the request data is already available, no need to parse more
119 0           $self->_run_app($app, $env, $sock);
120             } else {
121             # there's not yet enough data to parse the request,
122             # set up a watcher
123 0           $self->_create_req_parsing_watcher( $sock, $try_parse, $app );
124             };
125              
126 0           1;
127             }) {
128 0           $self->_bad_request($sock);
129             }
130 0           };
131             }
132              
133             # returns a closure that tries to parse
134             # this is not a method because it needs a buffer per socket
135             sub _try_read_headers {
136 0     0     my ( $self, $sock, undef ) = @_;
137              
138             # FIXME add a timer to manage read timeouts
139 0           local $/ = "\012";
140              
141 0           read_more: for my $headers ( $_[2] ) {
142 0 0 0       if ( defined(my $line = <$sock>) ) {
    0 0        
      0        
143 0           $headers .= $line;
144              
145 0 0 0       if ( $line eq "\015\012" or $line eq "\012" ) {
146             # got an empty line, we're done reading the headers
147 0           return 1;
148             } else {
149             # try to read more lines using buffered IO
150 0           redo read_more;
151             }
152             } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK ) {
153 0           die $!;
154             }
155             }
156              
157             # did not read to end of req, wait for more data to arrive
158 0           return;
159             }
160              
161             sub _create_req_parsing_watcher {
162 0     0     my ( $self, $sock, $try_parse, $app ) = @_;
163              
164 0           my $headers_io_watcher;
165             $headers_io_watcher = AE::io $sock, 0, sub {
166             try {
167 0 0         if ( my $env = $try_parse->() ) {
168 0           undef $headers_io_watcher;
169 0           $self->_run_app($app, $env, $sock);
170             }
171             } catch {
172 0           undef $headers_io_watcher;
173 0           $self->_bad_request($sock);
174             }
175 0     0     };
  0            
176             }
177              
178             sub _bad_request {
179 0     0     my ( $self, $sock ) = @_;
180              
181 0           $self->_write_psgi_response(
182             $sock,
183             [
184             400,
185             [ 'Content-Type' => 'text/plain' ],
186             [ ],
187             ],
188             );
189              
190 0           return;
191             }
192              
193             sub _run_app {
194 0     0     my($self, $app, $env, $sock) = @_;
195              
196 0           my $res = Plack::Util::run_app $app, $env;
197              
198 0 0 0       if ( ref $res eq 'ARRAY' ) {
    0          
    0          
199 0           $self->_write_psgi_response($sock, $res);
200             } elsif ( blessed($res) and $res->isa("AnyEvent::CondVar") ) {
201 0     0     $res->cb(sub { $self->_write_psgi_response($sock, shift->recv) });
  0            
202             } elsif ( ref $res eq 'CODE' ) {
203             $res->(
204             sub {
205 0     0     my $res = shift;
206              
207 0 0         if ( @$res < 2 ) {
    0          
208 0           croak "Insufficient arguments";
209             } elsif ( @$res == 2 ) {
210 0           my ( $status, $headers ) = @$res;
211              
212 0           $self->_flush($sock);
213              
214 0           my $writer = Plack::Server::AnyEvent::Writer->new($sock, $self->{exit_guard});
215              
216 0           my $buf = $self->_format_headers($status, $headers);
217 0           $writer->write($$buf);
218              
219 0           return $writer;
220             } else {
221 0           my ( $status, $headers, $body, $post ) = @$res;
222 0           my $cv = $self->_write_psgi_response($sock, [ $status, $headers, $body ]);
223 0 0         $cv->cb(sub { $post->() }) if $post;
  0            
224             }
225             },
226 0           $sock,
227             );
228             } else {
229 0           croak("Unknown response type: $res");
230             }
231             }
232              
233             sub _write_psgi_response {
234 0     0     my ( $self, $sock, $res ) = @_;
235              
236 0 0         if ( ref $res eq 'ARRAY' ) {
237 0 0         if ( scalar @$res == 0 ) {
238             # no response
239 0           $self->{exit_guard}->end;
240 0           return;
241             }
242              
243 0           my ( $status, $headers, $body ) = @$res;
244              
245 0           my $cv = AE::cv;
246              
247             $self->_write_headers( $sock, $status, $headers )->cb(sub {
248 0     0     local $@;
249 0 0         if ( eval { $_[0]->recv; 1 } ) {
  0            
  0            
250             $self->_write_body($sock, $body)->cb(sub {
251 0           $self->{exit_guard}->end;
252 0           local $@;
253 0 0         eval { $cv->send($_[0]->recv); 1 } or $cv->croak($@);
  0            
  0            
254 0           });
255             }
256 0           });
257              
258 0           return $cv;
259             } else {
260 1     1   7 no warnings 'uninitialized';
  1         3  
  1         374  
261 0           warn "Unknown response type: $res";
262 0           return $self->_write_psgi_response($sock, [ 204, [], [] ]);
263             }
264             }
265              
266             sub _write_headers {
267 0     0     my ( $self, $sock, $status, $headers ) = @_;
268              
269 0           $self->_write_buf( $sock, $self->_format_headers($status, $headers) );
270             }
271              
272             sub _format_headers {
273 0     0     my ( $self, $status, $headers ) = @_;
274              
275 0           my $hdr = sprintf "HTTP/1.0 %d %s\015\012", $status, HTTP::Status::status_message($status);
276              
277 0           my $i = 0;
278              
279 0           my @delim = ("\015\012", ": ");
280              
281 0           foreach my $str ( @$headers ) {
282 0           $hdr .= $str . $delim[++$i % 2];
283             }
284              
285 0           $hdr .= "\015\012";
286              
287 0           return \$hdr;
288             }
289              
290             # this flushes just the output buffer, not the input buffer (unlike
291             # $handle->flush)
292             sub _flush {
293 0     0     my ( $self, $sock ) = @_;
294              
295 0           local $| = 1;
296 0           print $sock '';
297             }
298              
299             # helper routine, similar to push write, but respects buffering, and refcounts
300             # itself
301             sub _write_buf {
302 0     0     my($self, $socket, $data) = @_;
303              
304 1     1   7 no warnings 'uninitialized';
  1         8  
  1         1693  
305              
306             # try writing immediately
307 0 0         if ( (my $written = syswrite($socket, $$data)) < length($$data) ) {
    0          
308 0   0       my $done = defined(wantarray) && AE::cv;
309              
310             # either the write failed or was incomplete
311              
312 0 0 0       if ( !defined($written) and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
      0        
      0        
313             # a real write error occured, like EPIPE
314 0 0         $done->croak($!) if $done;
315 0           return $done;
316             }
317              
318             # the write was either incomplete or a non fatal error occured, so we
319             # need to set up an IO watcher to wait until we can properly write
320              
321 0           my $length = length($$data);
322              
323 0           my $write_watcher;
324             $write_watcher = AE::io $socket, 1, sub {
325 0           write_more: {
326 0     0     my $out = syswrite($socket, $$data, $length - $written, $written);
327              
328 0 0 0       if ( defined($out) ) {
    0 0        
329 0           $written += $out;
330              
331 0 0         if ( $written == $length ) {
332 0           undef $write_watcher;
333 0 0         $done->send(1) if $done;
334             } else {
335 0           redo write_more;
336             }
337             } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
338 0 0         $done->croak($!) if $done;
339 0           undef $write_watcher;
340             }
341             }
342 0           };
343              
344 0           return $done;
345             } elsif ( defined wantarray ) {
346 0           my $done = AE::cv;
347 0           $done->send(1);
348 0           return $done;
349             }
350             }
351              
352             sub _write_body {
353 0     0     my ( $self, $sock, $body ) = @_;
354              
355 0 0 0       if ( ref $body eq 'ARRAY' ) {
    0          
    0          
356 0           my $buf = join "", @$body;
357 0           return $self->_write_buf($sock, \$buf);
358             } elsif ( Plack::Util::is_real_fh($body) ) {
359             # real handles use nonblocking IO
360             # either AIO or using watchers, with sendfile or with copying IO
361 0           $self->_write_real_fh($sock, $body);
362             } elsif ( blessed($body) and $body->can("string_ref") ) {
363             # optimize IO::String to not use its incredibly slow getline
364 0 0         if ( my $pos = $body->tell ) {
365 0           my $str = substr ${ $body->string_ref }, $pos;
  0            
366 0           return $self->_write_buf($sock, \$str);
367             } else {
368 0           return $self->_write_buf($sock, $body->string_ref);
369             }
370             } else {
371             # like Plack::Util::foreach, but nonblocking on the output
372             # handle
373              
374 0           my $handle = AnyEvent::Handle->new( fh => $sock );
375              
376 0           my $ret = AE::cv;
377              
378             $handle->on_error(sub {
379 0     0     my $err = $_[2];
380 0           $handle->destroy;
381 0           $ret->send($err);
382 0           });
383              
384             $handle->on_drain(sub {
385 0     0     local $/ = \4096;
386 0 0         if ( defined( my $buf = $body->getline ) ) {
    0          
387 0           $handle->push_write($buf);
388             } elsif ( $! ) {
389 0           $ret->croak($!);
390 0           $handle->destroy;
391             } else {
392 0           $body->close;
393             $handle->on_drain(sub {
394 0           shutdown $handle->fh, 1;
395 0           $handle->destroy;
396 0           $ret->send(1);
397 0           });
398             }
399 0           });
400              
401 0           return $ret;
402             }
403             }
404              
405             # when the body handle is a real filehandle we use this routine, which is more
406             # careful not to block when reading the response too
407              
408             # FIXME support only reading $length bytes from $body, instead of until EOF
409             # FIXME use len = 0 param to sendfile
410             # FIXME use Sys::Sendfile in nonblocking mode if AIO is not available
411             # FIXME test sendfile on non file backed handles
412             # FIXME this is actually pretty broken on linux
413             sub _write_real_fh {
414 0     0     my ( $self, $sock, $body ) = @_;
415              
416 0 0 0       if ( HAS_AIO and -s $body ) {
417 0           my $cv = AE::cv;
418 0           my $offset = 0;
419 0           my $length = -s $body;
420 0           $sock->blocking(1);
421 0           my $sendfile; $sendfile = sub {
422             IO::AIO::aio_sendfile( $sock, $body, $offset, $length - $offset, sub {
423 0           my $ret = shift;
424 0 0         $offset += $ret if $ret > 0;
425 0 0 0       if ($offset >= $length || ($ret == -1 && ! ($! == EAGAIN || $! == EINTR))) {
      0        
      0        
426 0 0         if ( $ret == -1 ) {
427 0           $cv->croak($!);
428             } else {
429 0           $cv->send(1);
430             }
431              
432 0           undef $sendfile;
433 0           undef $sock;
434             } else {
435 0           $sendfile->();
436             }
437 0     0     });
438 0           };
439 0           $sendfile->();
440 0           return $cv;
441             } else {
442             # $body is a real filehandle, so set up a watcher for it
443             # this is basically sendfile in userspace
444 0           my $sock_handle = AnyEvent::Handle->new( fh => $sock );
445 0           my $body_handle = AnyEvent::Handle->new( fh => $body );
446              
447 0           my $cv = AE::cv;
448              
449             my $err = sub {
450 0     0     $cv->croak($_[2]);
451              
452 0           for ( $sock_handle, $body_handle ) {
453 0           $_->destroy;
454             }
455 0           };
456              
457 0           $sock_handle->on_error($err);
458 0           $body_handle->on_error($err);
459              
460             $body_handle->on_eof(sub {
461 0     0     $body_handle->destroy;
462             $sock_handle->on_drain(sub {
463 0           shutdown $sock_handle->fh, 1;
464 0           $sock_handle->destroy;
465 0           $cv->send(1);
466 0           });
467 0           });
468              
469             $sock_handle->on_drain(sub {
470             $body_handle->push_read(sub {
471 0           $sock_handle->push_write($_[0]{rbuf});
472 0           $_[0]{rbuf} = '';
473 0     0     });
474 0           });
475              
476 0           return $cv;
477             }
478             }
479              
480             sub run {
481 0     0 0   my $self = shift;
482 0           $self->register_service(@_);
483              
484 0           my $exit = $self->{exit_guard} = AnyEvent->condvar;
485 0           $exit->begin;
486              
487 0     0     my $w; $w = AE::signal QUIT => sub { $exit->end; undef $w };
  0            
  0            
  0            
488              
489 0           $exit->recv;
490             }
491              
492             # ex: set sw=4 et:
493              
494             1;
495             __END__