File Coverage

blib/lib/Net/Async/HTTP/Connection.pm
Criterion Covered Total %
statement 326 348 93.6
branch 143 186 76.8
condition 62 84 73.8
subroutine 36 37 97.3
pod 4 9 44.4
total 571 664 85.9


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2023 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::HTTP::Connection 0.49;
7              
8 40     40   496 use v5.14;
  40         128  
9 40     40   185 use warnings;
  40         72  
  40         862  
10              
11 40     40   171 use Carp;
  40         72  
  40         2084  
12              
13 40     40   242 use base qw( IO::Async::Stream );
  40         102  
  40         20057  
14             IO::Async::Stream->VERSION( '0.59' ); # ->write( ..., on_write )
15              
16 40     40   764159 use Net::Async::HTTP::StallTimer;
  40         101  
  40         1117  
17              
18 40     40   16136 use HTTP::Response;
  40         857823  
  40         1929  
19              
20             my $CRLF = "\x0d\x0a"; # More portable than \r\n
21              
22 40     40   316 use Struct::Dumb;
  40         86  
  40         329  
23             struct RequestContext => [qw( req on_read stall_timer resp_header resp_bytes on_done is_done f )],
24             named_constructor => 1;
25              
26             # Detect whether HTTP::Message properly trims whitespace in header values. If
27             # it doesn't, we have to deploy a workaround to fix them up.
28             # https://rt.cpan.org/Ticket/Display.html?id=75224
29 40     40   3468 use constant HTTP_MESSAGE_TRIMS_LWS => HTTP::Message->parse( "Name: value " )->header("Name") eq "value";
  40         78  
  40         197  
30              
31             =head1 NAME
32              
33             C<Net::Async::HTTP::Connection> - HTTP client protocol handler
34              
35             =head1 DESCRIPTION
36              
37             This class provides a connection to a single HTTP server, and is used
38             internally by L<Net::Async::HTTP>. It is not intended for general use.
39              
40             =cut
41              
42             sub _init
43             {
44 105     105   1486 my $self = shift;
45 105         562 $self->SUPER::_init( @_ );
46              
47 105         2317 $self->{requests_in_flight} = 0;
48             }
49              
50             sub configure
51             {
52 307     307 1 282129 my $self = shift;
53 307         961 my %params = @_;
54              
55 307         696 foreach (qw( pipeline max_in_flight ready_queue decode_content is_proxy )) {
56 1535 100       3278 $self->{$_} = delete $params{$_} if exists $params{$_};
57             }
58              
59 307 100       742 if( my $on_closed = $params{on_closed} ) {
60             $params{on_closed} = sub {
61 69     69   13334 my $self = shift;
62              
63 69         393 $self->debug_printf( "CLOSED in-flight=$self->{requests_in_flight}" );
64              
65 69         341 $self->error_all( "Connection closed" );
66              
67 69         146 undef $self->{ready_queue};
68 69         210 $on_closed->( $self );
69 105         551 };
70             }
71              
72 307 50       717 croak "max_in_flight parameter required, may be zero" unless defined $self->{max_in_flight};
73              
74 307         1193 $self->SUPER::configure( %params );
75             }
76              
77             sub should_pipeline
78             {
79 216     216 0 541 my $self = shift;
80             return $self->{pipeline} &&
81             $self->{can_pipeline} &&
82 216   66     2532 ( !$self->{max_in_flight} || $self->{requests_in_flight} < $self->{max_in_flight} );
83             }
84              
85             sub connect
86             {
87 105     105 1 198 my $self = shift;
88 105         419 my %args = @_;
89              
90 105 100       433 my $key = defined $args{path} ? $args{path} : "$args{host}:$args{service}";
91 105         683 $self->debug_printf( "CONNECT $key" );
92              
93 105 50       507 defined wantarray or die "VOID ->connect";
94              
95             $self->SUPER::connect(
96             socktype => "stream",
97             %args
98             )->on_done( sub {
99 97     97   32382 $self->debug_printf( "CONNECTED" );
100 105         559 });
101             }
102              
103             sub ready
104             {
105 219     219 0 366 my $self = shift;
106              
107 219 100       570 my $queue = $self->{ready_queue} or return;
108              
109 196 100 66     459 if( $self->should_pipeline ) {
    100          
110 84         399 $self->debug_printf( "READY pipelined" );
111 84   100     462 while( @$queue && $self->should_pipeline ) {
112 19         55 my $ready = shift @$queue;
113 19         96 my $f = $ready->future;
114 19 100       149 next if $f->is_cancelled;
115              
116 18 100       132 $ready->connecting and $ready->connecting->cancel;
117              
118 18         164 $f->done( $self );
119             }
120             }
121             elsif( @$queue and $self->is_idle ) {
122 104         320 $self->debug_printf( "READY non-pipelined" );
123 104         417 while( @$queue ) {
124 104         208 my $ready = shift @$queue;
125 104         347 my $f = $ready->future;
126 104 50       819 next if $f->is_cancelled;
127              
128 104 100       672 $ready->connecting and $ready->connecting->cancel;
129              
130 104         881 $f->done( $self );
131 104         3772 last;
132             }
133             }
134             else {
135 8 50       22 $self->debug_printf( "READY cannot-run queue=%d idle=%s",
136             scalar @$queue, $self->is_idle ? "Y" : "N");
137             }
138             }
139              
140             sub is_idle
141             {
142 165     165 0 286 my $self = shift;
143 165         676 return $self->{requests_in_flight} == 0;
144             }
145              
146             sub on_read
147             {
148 260     260 1 441879 my $self = shift;
149 260         542 my ( $buffref, $closed ) = @_;
150              
151 260         1635 while( my $head = $self->{request_queue}[0]) {
152 230 100 50     1371 shift @{ $self->{request_queue} } and next if $head->is_done;
  1         8  
153              
154 229 100       1458 $head->stall_timer->reset if $head->stall_timer;
155              
156 229         1834 my $ret = $head->on_read->( $self, $buffref, $closed, $head );
157              
158 228 100       582 if( defined $ret ) {
159 107 100       498 return $ret if !ref $ret;
160              
161 74         287 $head->on_read = $ret;
162 74         695 return 1;
163             }
164              
165 121 50       341 $head->is_done or die "ARGH: undef return without being marked done";
166              
167 121         1081 shift @{ $self->{request_queue} };
  121         259  
168 121 100 100     673 return 1 if !$closed and length $$buffref;
169 119         6284 return;
170             }
171              
172             # Reinvoked after switch back to baseline, but may be idle again
173 31 100 66     151 return if $closed or !length $$buffref;
174              
175 1         10 $self->invoke_error( "Spurious on_read of connection while idle",
176             http_connection => read => $$buffref );
177 1         37 $$buffref = "";
178             }
179              
180             sub on_write_eof
181             {
182 0     0 1 0 my $self = shift;
183 0         0 $self->error_all( "Connection closed", http => undef, undef );
184             }
185              
186             sub error_all
187             {
188 69     69 0 142 my $self = shift;
189              
190 69         124 while( my $head = shift @{ $self->{request_queue} } ) {
  105         1571  
191 36 100 100     299 $head->f->fail( @_ ) unless $head->is_done or $head->f->is_ready;
192             }
193             }
194              
195             sub request
196             {
197 136     136 0 278 my $self = shift;
198 136         563 my %args = @_;
199              
200 136 50       478 my $on_header = $args{on_header} or croak "Expected 'on_header' as a CODE ref";
201              
202 136         256 my $req = $args{request};
203 136 50 33     1179 ref $req and $req->isa( "HTTP::Request" ) or croak "Expected 'request' as a HTTP::Request reference";
204              
205 136         502 $self->debug_printf( "REQUEST %s %s", $req->method, $req->uri );
206              
207 136         2537 my $request_body = $args{request_body};
208 136         280 my $expect_continue = !!$args{expect_continue};
209              
210 136         338 my $method = $req->method;
211              
212 136 100 100     1934 if( $method eq "POST" or $method eq "PUT" or length $req->content ) {
      100        
213 13         104 $req->init_header( "Content-Length", length $req->content );
214             }
215              
216 136 100       2486 if( $expect_continue ) {
217 1         9 $req->init_header( "Expect", "100-continue" );
218             }
219              
220 136 100       381 if( $self->{decode_content} ) {
221             #$req->init_header( "Accept-Encoding", Net::Async::HTTP->can_decode )
222 2         8 $req->init_header( "Accept-Encoding", "gzip" );
223             }
224              
225 136         543 my $f = $self->loop->new_future
226             ->set_label( "$method " . $req->uri );
227              
228             # TODO: Cancelling a request Future shouldn't necessarily close the socket
229             # if we haven't even started writing the request yet. But we can't know
230             # that currently.
231             $f->on_cancel( sub {
232 5     5   5568 $self->debug_printf( "CLOSE on_cancel" );
233 5         44 $self->close_now;
234 136         7320 });
235              
236 136         2286 my $stall_timer;
237 136 100       413 if( $args{stall_timeout} ) {
238             $stall_timer = Net::Async::HTTP::StallTimer->new(
239             delay => $args{stall_timeout},
240 4         38 future => $f,
241             );
242 4         220 $self->add_child( $stall_timer );
243             # Don't start it yet
244              
245             my $remove_timer = sub {
246 4 50   4   368 $self->remove_child( $stall_timer ) if $stall_timer;
247 4         498 undef $stall_timer;
248 4         284 };
249              
250 4         16 $f->on_ready( $remove_timer );
251             }
252              
253 136         748 push @{ $self->{request_queue} }, my $ctx = RequestContext(
254             req => $req,
255             on_read => undef, # will be set later
256             stall_timer => $stall_timer,
257             resp_header => undef, # will be set later
258             resp_bytes => 0,
259             on_done => $args{on_done},
260 136         310 is_done => 0,
261             f => $f,
262             );
263              
264 136         5221 my $on_body_write;
265 136 100 100     672 if( $stall_timer or $args{on_body_write} ) {
266 5         12 my $inner_on_body_write = $args{on_body_write};
267 5         8 my $written = 0;
268             $on_body_write = sub {
269 10 50   10   9793 $stall_timer->reset if $stall_timer;
270 10 50       30 $inner_on_body_write->( $written += $_[1] ) if $inner_on_body_write;
271 5         20 };
272             }
273              
274             my $write_request_body = defined $request_body ? sub {
275 4     4   10 my ( $self ) = @_;
276 4         36 $self->write( $request_body,
277             on_write => $on_body_write
278             );
279 136 100       450 } : undef;
280              
281             # Unless the request method is CONNECT, or we are connecting to a
282             # non-transparent proxy, the URL is not allowed to contain
283             # an authority; only path
284             # Take a copy of the headers since we'll be hacking them up
285 136         439 my $headers = $req->headers->clone;
286 136         2074 my $path;
287 136 50       369 if( $method eq "CONNECT" ) {
288 0         0 $path = $req->uri->as_string;
289             }
290             else {
291 136         356 my $uri = $req->uri;
292 136 100       914 if ( $self->{is_proxy} ) {
293 2         24 $path = $uri->as_string;
294             }
295             else {
296 134         665 $path = $uri->path_query;
297 134 100       2137 $path = "/$path" unless $path =~ m{^/};
298             }
299 136         439 my $authority = $uri->authority;
300 136 100 100     2348 if( defined $authority and
301             my ( $user, $pass, $host ) = $authority =~ m/^(.*?):(.*)@(.*)$/ ) {
302 2         10 $headers->init_header( Host => $host );
303 2         55 $headers->authorization_basic( $user, $pass );
304             }
305             else {
306 134         531 $headers->init_header( Host => $authority );
307             }
308             }
309              
310 136   100     5010 my $protocol = $req->protocol || "HTTP/1.1";
311 136         1810 my @headers = ( "$method $path $protocol" );
312             $headers->scan( sub {
313 326     326   4484 my ( $name, $value ) = @_;
314 326         509 $name =~ s/^://; # non-canonical header
315 326         875 push @headers, "$name: $value";
316 136         967 } );
317              
318 136 100       764 $stall_timer->start if $stall_timer;
319 136 100       917 $stall_timer->reason = "writing request" if $stall_timer;
320              
321 136 100   3   348 my $on_header_write = $stall_timer ? sub { $stall_timer->reset } : undef;
  3         1407  
322              
323 136         1038 $self->write( join( $CRLF, @headers ) .
324             $CRLF . $CRLF,
325             on_write => $on_header_write );
326              
327 136 100       22997 $self->write( $req->content,
328             on_write => $on_body_write ) if length $req->content;
329 136 100 100     2202 $write_request_body->( $self ) if $write_request_body and !$expect_continue;
330              
331             $self->write( "", on_flush => sub {
332 3 50   3   1423 return unless $stall_timer; # test again in case it was cancelled in the meantime
333 3         14 $stall_timer->reset;
334 3         767 $stall_timer->reason = "waiting for response";
335 136 100       480 }) if $stall_timer;
336              
337 136         453 $self->{requests_in_flight}++;
338              
339             $ctx->on_read = $self->_mk_on_read_header(
340 136 100       790 $args{previous_response}, $expect_continue ? $write_request_body : undef, $on_header
341             );
342              
343 136         1508 return $f;
344             }
345              
346             sub _mk_on_read_header
347             {
348 136     136   216 shift; # $self
349 136         422 my ( $previous_response, $write_request_body, $on_header ) = @_;
350              
351             sub {
352 128     128   740 my ( $self, $buffref, $closed, $ctx ) = @_;
353              
354 128         362 my $req = $ctx->req;
355 128         593 my $stall_timer = $ctx->stall_timer;
356 128         611 my $f = $ctx->f;
357              
358 128 100       598 if( $stall_timer ) {
359 2         6 $stall_timer->reason = "receiving response header";
360 2         6 $stall_timer->reset;
361             }
362              
363 128 100 100     1444 if( length $$buffref >= 4 and $$buffref !~ m/^HTTP/ ) {
364 1         4 $self->debug_printf( "ERROR fail" );
365 1 50       5 $f->fail( "Did not receive HTTP response from server", http => undef, $req ) unless $f->is_cancelled;
366 1         34 $self->close_now;
367             }
368              
369 128 100       1545 unless( $$buffref =~ s/^(.*?$CRLF$CRLF)//s ) {
370 3 100       23 if( $closed ) {
371 1         5 $self->debug_printf( "ERROR closed" );
372 1 50       5 $f->fail( "Connection closed while awaiting header", http => undef, $req ) unless $f->is_cancelled;
373 1         108 $self->close_now;
374             }
375 3         22 return 0;
376             }
377              
378 125         539 $ctx->resp_bytes += $+[0];
379              
380 125         1509 my $header = HTTP::Response->parse( $1 );
381             # HTTP::Response doesn't strip the \rs from this
382 125         24296 ( my $status_line = $header->status_line ) =~ s/\r$//;
383              
384 125         1349 $ctx->resp_header = $header;
385              
386 125         616 unless( HTTP_MESSAGE_TRIMS_LWS ) {
387 125         193 my @headers;
388             $header->scan( sub {
389 257         5992 my ( $name, $value ) = @_;
390 257         963 s/^\s+//, s/\s+$// for $value;
391 257         710 push @headers, $name => $value;
392 125         1191 } );
393 125 100       1222 $header->header( @headers ) if @headers;
394             }
395              
396 125         8958 my $protocol = $header->protocol;
397 125 100 66     1931 if( $protocol =~ m{^HTTP/1\.(\d+)$} and $1 >= 1 ) {
398 118         313 $self->{can_pipeline} = 1;
399             }
400              
401 125 100       386 if( $header->code =~ m/^1/ ) { # 1xx is not a final response
402 1         15 $self->debug_printf( "HEADER [provisional] %s", $status_line );
403 1 50       14 $write_request_body->( $self ) if $write_request_body;
404 1         176 return 1;
405             }
406              
407 124         1562 $header->request( $req );
408 124 100       1249 $header->previous( $previous_response ) if $previous_response;
409              
410 124         484 $self->debug_printf( "HEADER %s", $status_line );
411              
412 124         515 my $on_body_chunk = $on_header->( $header );
413              
414 124         508 my $code = $header->code;
415              
416 124         1273 my $content_encoding = $header->header( "Content-Encoding" );
417              
418 124         4597 my $decoder;
419 124 100 66     404 if( $content_encoding and
420             $decoder = Net::Async::HTTP->can_decode( $content_encoding ) ) {
421 2         16 $header->init_header( "X-Original-Content-Encoding" => $header->remove_header( "Content-Encoding" ) );
422             }
423              
424             # can_pipeline is set for HTTP/1.1 or above; presume it can keep-alive if set
425 124   66     506 my $connection_close = lc( $header->header( "Connection" ) || ( $self->{can_pipeline} ? "keep-alive" : "close" ) )
426             eq "close";
427              
428 124 100 50     4960 if( $connection_close ) {
    50          
429 23         51 $self->{max_in_flight} = 1;
430             }
431             elsif( defined( my $keep_alive = lc( $header->header("Keep-Alive") || "" ) ) ) {
432 101         4969 my ( $max ) = ( $keep_alive =~ m/max=(\d+)/ );
433 101 50 33     316 $self->{max_in_flight} = $max if $max && $max < $self->{max_in_flight};
434             }
435              
436             my $on_more = sub {
437 88         183 my ( $chunk ) = @_;
438              
439 88 50 66     255 if( $decoder and not eval { $chunk = $decoder->( $chunk ); 1 } ) {
  2         5  
  2         7  
440 0         0 $self->debug_printf( "ERROR decode failed" );
441 0         0 $f->fail( "Decode error $@", http => undef, $req );
442 0         0 $self->close;
443 0         0 return undef;
444             }
445              
446 88         319 $on_body_chunk->( $chunk );
447              
448 88         1491 return 1;
449 124         724 };
450             my $on_done = sub {
451 122         251 my ( $ctx ) = @_;
452              
453 122         468 $ctx->is_done++;
454              
455             # TODO: IO::Async probably ought to do this. We need to fire the
456             # on_closed event _before_ calling on_body_chunk, to clear the
457             # connection cache in case another request comes - e.g. HEAD->GET
458 122 100       796 $self->close if $connection_close;
459              
460 122         498 my $final;
461 122 50 66     389 if( $decoder and not eval { $final = $decoder->(); 1 } ) {
  2         5  
  2         6  
462 0         0 $self->debug_printf( "ERROR decode failed" );
463 0         0 $f->fail( "Decode error $@", http => undef, $req );
464 0         0 $self->close;
465 0         0 return undef;
466             }
467              
468 122 50 66     379 $on_body_chunk->( $final ) if defined $final and length $final;
469              
470 122         341 my $response = $on_body_chunk->();
471 122 50       349 my $e = eval { $f->done( $response ) unless $f->is_cancelled; 1 } ? undef : $@;
  122 100       440  
  120         6808  
472              
473 122 50       593 $ctx->on_done->( $ctx ) if $ctx->on_done;
474              
475 122         1150 $self->{requests_in_flight}--;
476 122         604 $self->debug_printf( "DONE remaining in-flight=$self->{requests_in_flight}" );
477 122         546 $self->ready;
478              
479 122 100       1021 if( defined $e ) {
480 2         6 chomp $e;
481 2         13 $self->invoke_error( $e, perl => );
482             # This might not return, if it top-level croaks
483             }
484              
485 121         962 return undef; # Finished
486 124         681 };
487              
488             # RFC 2616 says "HEAD" does not have a body, nor do any 1xx codes, nor
489             # 204 (No Content) nor 304 (Not Modified)
490 124 100 66     488 if( $req->method eq "HEAD" or $code =~ m/^1..$/ or $code eq "204" or $code eq "304" ) {
      66        
      66        
491 3         49 $self->debug_printf( "BODY done [none]" );
492 3         12 return $on_done->( $ctx );
493             }
494              
495 121         2382 my $transfer_encoding = $header->header( "Transfer-Encoding" );
496 121         4617 my $content_length = $header->content_length;
497              
498 121 100 66     5074 if( defined $transfer_encoding and $transfer_encoding eq "chunked" ) {
    100          
499 4         13 $self->debug_printf( "BODY chunks" );
500              
501 4 50       17 $stall_timer->reason = "receiving body chunks" if $stall_timer;
502 4         12 return $self->_mk_on_read_chunked( $on_more, $on_done );
503             }
504             elsif( defined $content_length ) {
505 104         641 $self->debug_printf( "BODY length $content_length" );
506              
507 104 100       473 if( $content_length == 0 ) {
508 50         155 $self->debug_printf( "BODY done [length=0]" );
509 50         212 return $on_done->( $ctx );
510             }
511              
512 54 100       242 $stall_timer->reason = "receiving body" if $stall_timer;
513 54         197 return $self->_mk_on_read_length( $content_length, $on_more, $on_done );
514             }
515             else {
516 13         55 $self->debug_printf( "BODY until EOF" );
517              
518 13 50       62 $stall_timer->reason = "receiving body until EOF" if $stall_timer;
519 13         42 return $self->_mk_on_read_until_eof( $on_more, $on_done );
520             }
521 136         1613 };
522             }
523              
524             sub _mk_on_read_chunked
525             {
526 4     4   8 shift; # $self
527 4         16 my ( $on_more, $on_done ) = @_;
528              
529 4         5 my $chunk_length;
530              
531             sub {
532 16     16   63 my ( $self, $buffref, $closed, $ctx ) = @_;
533              
534 16         34 my $req = $ctx->req;
535 16         60 my $f = $ctx->f;
536              
537 16 100 66     168 if( !defined $chunk_length and $$buffref =~ s/^(.*?)$CRLF// ) {
538 10         24 my $header = $1;
539 10         23 $ctx->resp_bytes += $+[0];
540              
541             # Chunk header
542 10 100       80 unless( $header =~ s/^([A-Fa-f0-9]+).*// ) {
543 1 50       4 $f->fail( "Corrupted chunk header", http => undef, $req ) unless $f->is_cancelled;
544 1         37 $self->close_now;
545 1         13 return 0;
546             }
547              
548 9         21 $chunk_length = hex( $1 );
549 9 100       27 return 1 if $chunk_length;
550              
551 3         14 return $self->_mk_on_read_chunk_trailer( $req, $on_more, $on_done, $f );
552             }
553              
554             # Chunk is followed by a CRLF, which isn't counted in the length;
555 6 50 33     36 if( defined $chunk_length and length( $$buffref ) >= $chunk_length + 2 ) {
556             # Chunk body
557 6         17 my $chunk = substr( $$buffref, 0, $chunk_length, "" );
558 6         14 $ctx->resp_bytes += length $chunk;
559              
560 6 50       59 unless( $$buffref =~ s/^$CRLF// ) {
561 0         0 $self->debug_printf( "ERROR chunk without CRLF" );
562 0 0       0 $f->fail( "Chunk of size $chunk_length wasn't followed by CRLF", http => undef, $req ) unless $f->is_cancelled;
563 0         0 $self->close;
564             }
565              
566 6         17 $ctx->resp_bytes += $+[0];
567              
568 6         30 undef $chunk_length;
569              
570 6         12 return $on_more->( $chunk );
571             }
572              
573 0 0       0 if( $closed ) {
574 0         0 $self->debug_printf( "ERROR closed" );
575 0 0       0 $f->fail( "Connection closed while awaiting chunk", http => undef, $req ) unless $f->is_cancelled;
576             }
577 0         0 return 0;
578 4         29 };
579             }
580              
581             sub _mk_on_read_chunk_trailer
582             {
583 3     3   6 shift; # $self
584 3         8 my ( undef, $on_more, $on_done ) = @_;
585              
586 3         7 my $trailer = "";
587              
588             sub {
589 3     3   14 my ( $self, $buffref, $closed, $ctx ) = @_;
590              
591 3         8 my $req = $ctx->req;
592 3         16 my $f = $ctx->f;
593              
594 3 50       15 if( $closed ) {
595 0         0 $self->debug_printf( "ERROR closed" );
596 0 0       0 $f->fail( "Connection closed while awaiting chunk trailer", http => undef, $req ) unless $f->is_cancelled;
597             }
598              
599 3 50       71 $$buffref =~ s/^(.*)$CRLF// or return 0;
600 3         23 $trailer .= $1;
601 3         10 $ctx->resp_bytes += $+[0];
602              
603 3 50       25 return 1 if length $1;
604              
605             # TODO: Actually use the trailer
606              
607 3         12 $self->debug_printf( "BODY done [chunked]" );
608 3         13 return $on_done->( $ctx );
609 3         39 };
610             }
611              
612             sub _mk_on_read_length
613             {
614 54     54   88 shift; # $self
615 54         179 my ( $content_length, $on_more, $on_done ) = @_;
616              
617             sub {
618 55     55   340 my ( $self, $buffref, $closed, $ctx ) = @_;
619              
620 55         282 my $req = $ctx->req;
621 55         294 my $f = $ctx->f;
622              
623             # This will truncate it if the server provided too much
624 55         337 my $content = substr( $$buffref, 0, $content_length, "" );
625 55         149 $content_length -= length $content;
626 55         159 $ctx->resp_bytes += length $content;
627              
628 55 50       334 return undef unless $on_more->( $content );
629              
630 55 100       164 if( $content_length == 0 ) {
631 53         195 $self->debug_printf( "BODY done [length]" );
632 53         192 return $on_done->( $ctx );
633             }
634              
635 2 50       7 if( $closed ) {
636 0         0 $self->debug_printf( "ERROR closed" );
637 0 0       0 $f->fail( "Connection closed while awaiting body", http => undef, $req ) unless $f->is_cancelled;
638             }
639 2         5 return 0;
640 54         405 };
641             }
642              
643             sub _mk_on_read_until_eof
644             {
645 13     13   21 shift; # $self
646 13         26 my ( $on_more, $on_done ) = @_;
647              
648             sub {
649 27     27   172 my ( $self, $buffref, $closed, $ctx ) = @_;
650              
651 27         78 my $content = $$buffref;
652 27         47 $$buffref = "";
653 27         54 $ctx->resp_bytes += length $content;
654              
655 27 50       121 return undef unless $on_more->( $content );
656              
657 27 100       64 return 0 unless $closed;
658              
659 13         47 $self->debug_printf( "BODY done [eof]" );
660 13         49 return $on_done->( $ctx );
661 13         83 };
662             }
663              
664             =head1 AUTHOR
665              
666             Paul Evans <leonerd@leonerd.org.uk>
667              
668             =cut
669              
670             0x55AA;