File Coverage

blib/lib/Net/Async/HTTP/Connection.pm
Criterion Covered Total %
statement 326 348 93.6
branch 141 184 76.6
condition 62 84 73.8
subroutine 36 37 97.3
pod 4 9 44.4
total 569 662 85.9


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2019 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::HTTP::Connection;
7              
8 37     37   266 use strict;
  37         77  
  37         1066  
9 37     37   183 use warnings;
  37         68  
  37         1510  
10              
11             our $VERSION = '0.48';
12              
13 37     37   192 use Carp;
  37         72  
  37         2095  
14              
15 37     37   239 use base qw( IO::Async::Stream );
  37         80  
  37         22476  
16             IO::Async::Stream->VERSION( '0.59' ); # ->write( ..., on_write )
17              
18 37     37   1413410 use Net::Async::HTTP::StallTimer;
  37         99  
  37         1164  
19              
20 37     37   18347 use HTTP::Response;
  37         830437  
  37         2105  
21              
22             my $CRLF = "\x0d\x0a"; # More portable than \r\n
23              
24 37     37   415 use Struct::Dumb;
  37         288  
  37         351  
25             struct RequestContext => [qw( req on_read stall_timer resp_header resp_bytes on_done is_done f )],
26             named_constructor => 1;
27              
28             # Detect whether HTTP::Message properly trims whitespace in header values. If
29             # it doesn't, we have to deploy a workaround to fix them up.
30             # https://rt.cpan.org/Ticket/Display.html?id=75224
31 37     37   3787 use constant HTTP_MESSAGE_TRIMS_LWS => HTTP::Message->parse( "Name: value " )->header("Name") eq "value";
  37         94  
  37         346  
32              
33             =head1 NAME
34              
35             C - HTTP client protocol handler
36              
37             =head1 DESCRIPTION
38              
39             This class provides a connection to a single HTTP server, and is used
40             internally by L. It is not intended for general use.
41              
42             =cut
43              
44             sub _init
45             {
46 100     100   1728 my $self = shift;
47 100         634 $self->SUPER::_init( @_ );
48              
49 100         2325 $self->{requests_in_flight} = 0;
50             }
51              
52             sub configure
53             {
54 286     286 1 271680 my $self = shift;
55 286         984 my %params = @_;
56              
57 286         700 foreach (qw( pipeline max_in_flight ready_queue decode_content is_proxy )) {
58 1430 100       3376 $self->{$_} = delete $params{$_} if exists $params{$_};
59             }
60              
61 286 100       777 if( my $on_closed = $params{on_closed} ) {
62             $params{on_closed} = sub {
63 67     67   15527 my $self = shift;
64              
65 67         392 $self->debug_printf( "CLOSED in-flight=$self->{requests_in_flight}" );
66              
67 67         412 $self->error_all( "Connection closed" );
68              
69 67         149 undef $self->{ready_queue};
70 67         299 $on_closed->( $self );
71 100         519 };
72             }
73              
74 286 50       745 croak "max_in_flight parameter required, may be zero" unless defined $self->{max_in_flight};
75              
76 286         1236 $self->SUPER::configure( %params );
77             }
78              
79             sub should_pipeline
80             {
81 205     205 0 566 my $self = shift;
82             return $self->{pipeline} &&
83             $self->{can_pipeline} &&
84 205   66     1990 ( !$self->{max_in_flight} || $self->{requests_in_flight} < $self->{max_in_flight} );
85             }
86              
87             sub connect
88             {
89 100     100 1 217 my $self = shift;
90 100         519 my %args = @_;
91              
92 100         871 $self->debug_printf( "CONNECT $args{host}:$args{service}" );
93              
94 100 50       534 defined wantarray or die "VOID ->connect";
95              
96             $self->SUPER::connect(
97             socktype => "stream",
98             %args
99             )->on_done( sub {
100 92     92   35376 $self->debug_printf( "CONNECTED" );
101 100         581 });
102             }
103              
104             sub ready
105             {
106 207     207 0 482 my $self = shift;
107              
108 207 100       677 my $queue = $self->{ready_queue} or return;
109              
110 186 100 66     508 if( $self->should_pipeline ) {
    100          
111 79         279 $self->debug_printf( "READY pipelined" );
112 79   100     492 while( @$queue && $self->should_pipeline ) {
113 18         49 my $ready = shift @$queue;
114 18         78 my $f = $ready->future;
115 18 100       187 next if $f->is_cancelled;
116              
117 17 100       119 $ready->connecting and $ready->connecting->cancel;
118              
119 17         137 $f->done( $self );
120             }
121             }
122             elsif( @$queue and $self->is_idle ) {
123 99         407 $self->debug_printf( "READY non-pipelined" );
124 99         534 while( @$queue ) {
125 99         215 my $ready = shift @$queue;
126 99         403 my $f = $ready->future;
127 99 50       883 next if $f->is_cancelled;
128              
129 99 100       729 $ready->connecting and $ready->connecting->cancel;
130              
131 99         977 $f->done( $self );
132 99         4288 last;
133             }
134             }
135             else {
136 8 50       22 $self->debug_printf( "READY cannot-run queue=%d idle=%s",
137             scalar @$queue, $self->is_idle ? "Y" : "N");
138             }
139             }
140              
141             sub is_idle
142             {
143 158     158 0 317 my $self = shift;
144 158         782 return $self->{requests_in_flight} == 0;
145             }
146              
147             sub on_read
148             {
149 246     246 1 448260 my $self = shift;
150 246         654 my ( $buffref, $closed ) = @_;
151              
152 246         1821 while( my $head = $self->{request_queue}[0]) {
153 216 100 50     1544 shift @{ $self->{request_queue} } and next if $head->is_done;
  1         9  
154              
155 215 100       1614 $head->stall_timer->reset if $head->stall_timer;
156              
157 215         1957 my $ret = $head->on_read->( $self, $buffref, $closed, $head );
158              
159 214 100       620 if( defined $ret ) {
160 100 100       556 return $ret if !ref $ret;
161              
162 69         234 $head->on_read = $ret;
163 69         735 return 1;
164             }
165              
166 114 50       329 $head->is_done or die "ARGH: undef return without being marked done";
167              
168 114         817 shift @{ $self->{request_queue} };
  114         315  
169 114 100 100     749 return 1 if !$closed and length $$buffref;
170 112         6872 return;
171             }
172              
173             # Reinvoked after switch back to baseline, but may be idle again
174 31 100 66     169 return if $closed or !length $$buffref;
175              
176 1         10 $self->invoke_error( "Spurious on_read of connection while idle",
177             http_connection => read => $$buffref );
178 1         37 $$buffref = "";
179             }
180              
181             sub on_write_eof
182             {
183 0     0 1 0 my $self = shift;
184 0         0 $self->error_all( "Connection closed", http => undef, undef );
185             }
186              
187             sub error_all
188             {
189 67     67 0 125 my $self = shift;
190              
191 67         156 while( my $head = shift @{ $self->{request_queue} } ) {
  101         1714  
192 34 100 100     264 $head->f->fail( @_ ) unless $head->is_done or $head->f->is_ready;
193             }
194             }
195              
196             sub request
197             {
198 129     129 0 297 my $self = shift;
199 129         626 my %args = @_;
200              
201 129 50       450 my $on_header = $args{on_header} or croak "Expected 'on_header' as a CODE ref";
202              
203 129         280 my $req = $args{request};
204 129 50 33     1264 ref $req and $req->isa( "HTTP::Request" ) or croak "Expected 'request' as a HTTP::Request reference";
205              
206 129         555 $self->debug_printf( "REQUEST %s %s", $req->method, $req->uri );
207              
208 129         2854 my $request_body = $args{request_body};
209 129         286 my $expect_continue = !!$args{expect_continue};
210              
211 129         349 my $method = $req->method;
212              
213 129 100 100     2048 if( $method eq "POST" or $method eq "PUT" or length $req->content ) {
      100        
214 13         85 $req->init_header( "Content-Length", length $req->content );
215             }
216              
217 129 100       2698 if( $expect_continue ) {
218 1         5 $req->init_header( "Expect", "100-continue" );
219             }
220              
221 129 100       422 if( $self->{decode_content} ) {
222             #$req->init_header( "Accept-Encoding", Net::Async::HTTP->can_decode )
223 2         9 $req->init_header( "Accept-Encoding", "gzip" );
224             }
225              
226 129         588 my $f = $self->loop->new_future
227             ->set_label( "$method " . $req->uri );
228              
229             # TODO: Cancelling a request Future shouldn't necessarily close the socket
230             # if we haven't even started writing the request yet. But we can't know
231             # that currently.
232             $f->on_cancel( sub {
233 5     5   5721 $self->debug_printf( "CLOSE on_cancel" );
234 5         45 $self->close_now;
235 129         6563 });
236              
237 129         2577 my $stall_timer;
238 129 100       495 if( $args{stall_timeout} ) {
239             $stall_timer = Net::Async::HTTP::StallTimer->new(
240             delay => $args{stall_timeout},
241 4         37 future => $f,
242             );
243 4         247 $self->add_child( $stall_timer );
244             # Don't start it yet
245              
246             my $remove_timer = sub {
247 4 50   4   339 $self->remove_child( $stall_timer ) if $stall_timer;
248 4         452 undef $stall_timer;
249 4         284 };
250              
251 4         12 $f->on_ready( $remove_timer );
252             }
253              
254 129         752 push @{ $self->{request_queue} }, my $ctx = RequestContext(
255             req => $req,
256             on_read => undef, # will be set later
257             stall_timer => $stall_timer,
258             resp_header => undef, # will be set later
259             resp_bytes => 0,
260             on_done => $args{on_done},
261 129         305 is_done => 0,
262             f => $f,
263             );
264              
265 129         5666 my $on_body_write;
266 129 100 100     688 if( $stall_timer or $args{on_body_write} ) {
267 5         9 my $inner_on_body_write = $args{on_body_write};
268 5         8 my $written = 0;
269             $on_body_write = sub {
270 10 50   10   11904 $stall_timer->reset if $stall_timer;
271 10 50       36 $inner_on_body_write->( $written += $_[1] ) if $inner_on_body_write;
272 5         22 };
273             }
274              
275             my $write_request_body = defined $request_body ? sub {
276 4     4   12 my ( $self ) = @_;
277 4         14 $self->write( $request_body,
278             on_write => $on_body_write
279             );
280 129 100       379 } : undef;
281              
282             # Unless the request method is CONNECT, or we are connecting to a
283             # non-transparent proxy, the URL is not allowed to contain
284             # an authority; only path
285             # Take a copy of the headers since we'll be hacking them up
286 129         481 my $headers = $req->headers->clone;
287 129         14871 my $path;
288 129 50       409 if( $method eq "CONNECT" ) {
289 0         0 $path = $req->uri->as_string;
290             }
291             else {
292 129         387 my $uri = $req->uri;
293 129 100       1040 if ( $self->{is_proxy} ) {
294 1         18 $path = $uri->as_string;
295             }
296             else {
297 128         741 $path = $uri->path_query;
298 128 100       2425 $path = "/$path" unless $path =~ m{^/};
299             }
300 129         462 my $authority = $uri->authority;
301 129 100 100     2665 if( defined $authority and
302             my ( $user, $pass, $host ) = $authority =~ m/^(.*?):(.*)@(.*)$/ ) {
303 2         10 $headers->init_header( Host => $host );
304 2         63 $headers->authorization_basic( $user, $pass );
305             }
306             else {
307 127         467 $headers->init_header( Host => $authority );
308             }
309             }
310              
311 129   100     5382 my $protocol = $req->protocol || "HTTP/1.1";
312 129         2148 my @headers = ( "$method $path $protocol" );
313             $headers->scan( sub {
314 310     310   4170 my ( $name, $value ) = @_;
315 310         641 $name =~ s/^://; # non-canonical header
316 310         1039 push @headers, "$name: $value";
317 129         858 } );
318              
319 129 100       803 $stall_timer->start if $stall_timer;
320 129 100       932 $stall_timer->reason = "writing request" if $stall_timer;
321              
322 129 100   3   369 my $on_header_write = $stall_timer ? sub { $stall_timer->reset } : undef;
  3         1340  
323              
324 129         1616 $self->write( join( $CRLF, @headers ) .
325             $CRLF . $CRLF,
326             on_write => $on_header_write );
327              
328 129 100       25579 $self->write( $req->content,
329             on_write => $on_body_write ) if length $req->content;
330 129 100 100     2426 $write_request_body->( $self ) if $write_request_body and !$expect_continue;
331              
332             $self->write( "", on_flush => sub {
333 3 50   3   1664 return unless $stall_timer; # test again in case it was cancelled in the meantime
334 3         12 $stall_timer->reset;
335 3         595 $stall_timer->reason = "waiting for response";
336 129 100       506 }) if $stall_timer;
337              
338 129         439 $self->{requests_in_flight}++;
339              
340             $ctx->on_read = $self->_mk_on_read_header(
341 129 100       804 $args{previous_response}, $expect_continue ? $write_request_body : undef, $on_header
342             );
343              
344 129         1683 return $f;
345             }
346              
347             sub _mk_on_read_header
348             {
349 129     129   244 shift; # $self
350 129         500 my ( $previous_response, $write_request_body, $on_header ) = @_;
351              
352             sub {
353 121     121   806 my ( $self, $buffref, $closed, $ctx ) = @_;
354              
355 121         398 my $req = $ctx->req;
356 121         651 my $stall_timer = $ctx->stall_timer;
357 121         677 my $f = $ctx->f;
358              
359 121 100       704 if( $stall_timer ) {
360 2         7 $stall_timer->reason = "receiving response header";
361 2         5 $stall_timer->reset;
362             }
363              
364 121 100 100     1551 if( length $$buffref >= 4 and $$buffref !~ m/^HTTP/ ) {
365 1         4 $self->debug_printf( "ERROR fail" );
366 1 50       6 $f->fail( "Did not receive HTTP response from server", http => undef, $req ) unless $f->is_cancelled;
367 1         38 $self->close_now;
368             }
369              
370 121 100       1810 unless( $$buffref =~ s/^(.*?$CRLF$CRLF)//s ) {
371 3 100       13 if( $closed ) {
372 1         4 $self->debug_printf( "ERROR closed" );
373 1 50       6 $f->fail( "Connection closed while awaiting header", http => undef, $req ) unless $f->is_cancelled;
374 1         108 $self->close_now;
375             }
376 3         26 return 0;
377             }
378              
379 118         514 $ctx->resp_bytes += $+[0];
380              
381 118         1706 my $header = HTTP::Response->parse( $1 );
382             # HTTP::Response doesn't strip the \rs from this
383 118         26426 ( my $status_line = $header->status_line ) =~ s/\r$//;
384              
385 118         1495 $ctx->resp_header = $header;
386              
387 118         666 unless( HTTP_MESSAGE_TRIMS_LWS ) {
388 118         229 my @headers;
389             $header->scan( sub {
390 240         5945 my ( $name, $value ) = @_;
391 240         1061 s/^\s+//, s/\s+$// for $value;
392 240         744 push @headers, $name => $value;
393 118         1267 } );
394 118 100       1345 $header->header( @headers ) if @headers;
395             }
396              
397 118         9826 my $protocol = $header->protocol;
398 118 100 66     2093 if( $protocol =~ m{^HTTP/1\.(\d+)$} and $1 >= 1 ) {
399 111         341 $self->{can_pipeline} = 1;
400             }
401              
402 118 100       375 if( $header->code =~ m/^1/ ) { # 1xx is not a final response
403 1         30 $self->debug_printf( "HEADER [provisional] %s", $status_line );
404 1 50       9 $write_request_body->( $self ) if $write_request_body;
405 1         229 return 1;
406             }
407              
408 117         1675 $header->request( $req );
409 117 100       1334 $header->previous( $previous_response ) if $previous_response;
410              
411 117         539 $self->debug_printf( "HEADER %s", $status_line );
412              
413 117         536 my $on_body_chunk = $on_header->( $header );
414              
415 117         526 my $code = $header->code;
416              
417 117         1256 my $content_encoding = $header->header( "Content-Encoding" );
418              
419 117         4825 my $decoder;
420 117 100 66     462 if( $content_encoding and
421             $decoder = Net::Async::HTTP->can_decode( $content_encoding ) ) {
422 2         21 $header->init_header( "X-Original-Content-Encoding" => $header->remove_header( "Content-Encoding" ) );
423             }
424              
425             # can_pipeline is set for HTTP/1.1 or above; presume it can keep-alive if set
426 117   66     582 my $connection_close = lc( $header->header( "Connection" ) || ( $self->{can_pipeline} ? "keep-alive" : "close" ) )
427             eq "close";
428              
429 117 100 50     5286 if( $connection_close ) {
    50          
430 21         58 $self->{max_in_flight} = 1;
431             }
432             elsif( defined( my $keep_alive = lc( $header->header("Keep-Alive") || "" ) ) ) {
433 96         5477 my ( $max ) = ( $keep_alive =~ m/max=(\d+)/ );
434 96 50 33     374 $self->{max_in_flight} = $max if $max && $max < $self->{max_in_flight};
435             }
436              
437             my $on_more = sub {
438 81         202 my ( $chunk ) = @_;
439              
440 81 50 66     260 if( $decoder and not eval { $chunk = $decoder->( $chunk ); 1 } ) {
  2         7  
  2         9  
441 0         0 $self->debug_printf( "ERROR decode failed" );
442 0         0 $f->fail( "Decode error $@", http => undef, $req );
443 0         0 $self->close;
444 0         0 return undef;
445             }
446              
447 81         271 $on_body_chunk->( $chunk );
448              
449 81         1482 return 1;
450 117         681 };
451             my $on_done = sub {
452 115         306 my ( $ctx ) = @_;
453              
454 115         324 $ctx->is_done++;
455              
456             # TODO: IO::Async probably ought to do this. We need to fire the
457             # on_closed event _before_ calling on_body_chunk, to clear the
458             # connection cache in case another request comes - e.g. HEAD->GET
459 115 100       802 $self->close if $connection_close;
460              
461 115         527 my $final;
462 115 50 66     534 if( $decoder and not eval { $final = $decoder->(); 1 } ) {
  2         13  
  2         8  
463 0         0 $self->debug_printf( "ERROR decode failed" );
464 0         0 $f->fail( "Decode error $@", http => undef, $req );
465 0         0 $self->close;
466 0         0 return undef;
467             }
468              
469 115 50 66     484 $on_body_chunk->( $final ) if defined $final and length $final;
470              
471 115         336 my $response = $on_body_chunk->();
472 115 50       301 my $e = eval { $f->done( $response ) unless $f->is_cancelled; 1 } ? undef : $@;
  115 100       530  
  113         7856  
473              
474 115 50       517 $ctx->on_done->( $ctx ) if $ctx->on_done;
475              
476 115         1303 $self->{requests_in_flight}--;
477 115         876 $self->debug_printf( "DONE remaining in-flight=$self->{requests_in_flight}" );
478 115         736 $self->ready;
479              
480 115 100       1022 if( defined $e ) {
481 2         5 chomp $e;
482 2         12 $self->invoke_error( $e, perl => );
483             # This might not return, if it top-level croaks
484             }
485              
486 114         1020 return undef; # Finished
487 117         670 };
488              
489             # RFC 2616 says "HEAD" does not have a body, nor do any 1xx codes, nor
490             # 204 (No Content) nor 304 (Not Modified)
491 117 100 66     571 if( $req->method eq "HEAD" or $code =~ m/^1..$/ or $code eq "204" or $code eq "304" ) {
      66        
      66        
492 3         69 $self->debug_printf( "BODY done [none]" );
493 3         14 return $on_done->( $ctx );
494             }
495              
496 114         2489 my $transfer_encoding = $header->header( "Transfer-Encoding" );
497 114         4784 my $content_length = $header->content_length;
498              
499 114 100 66     4534 if( defined $transfer_encoding and $transfer_encoding eq "chunked" ) {
    100          
500 4         18 $self->debug_printf( "BODY chunks" );
501              
502 4 50       26 $stall_timer->reason = "receiving body chunks" if $stall_timer;
503 4         23 return $self->_mk_on_read_chunked( $on_more, $on_done );
504             }
505             elsif( defined $content_length ) {
506 99         626 $self->debug_printf( "BODY length $content_length" );
507              
508 99 100       561 if( $content_length == 0 ) {
509 48         163 $self->debug_printf( "BODY done [length=0]" );
510 48         245 return $on_done->( $ctx );
511             }
512              
513 51 100       161 $stall_timer->reason = "receiving body" if $stall_timer;
514 51         195 return $self->_mk_on_read_length( $content_length, $on_more, $on_done );
515             }
516             else {
517 11         93 $self->debug_printf( "BODY until EOF" );
518              
519 11 50       58 $stall_timer->reason = "receiving body until EOF" if $stall_timer;
520 11         51 return $self->_mk_on_read_until_eof( $on_more, $on_done );
521             }
522 129         1792 };
523             }
524              
525             sub _mk_on_read_chunked
526             {
527 4     4   9 shift; # $self
528 4         13 my ( $on_more, $on_done ) = @_;
529              
530 4         9 my $chunk_length;
531              
532             sub {
533 16     16   81 my ( $self, $buffref, $closed, $ctx ) = @_;
534              
535 16         42 my $req = $ctx->req;
536 16         80 my $f = $ctx->f;
537              
538 16 100 66     238 if( !defined $chunk_length and $$buffref =~ s/^(.*?)$CRLF// ) {
539 10         30 my $header = $1;
540 10         30 $ctx->resp_bytes += $+[0];
541              
542             # Chunk header
543 10 100       105 unless( $header =~ s/^([A-Fa-f0-9]+).*// ) {
544 1 50       6 $f->fail( "Corrupted chunk header", http => undef, $req ) unless $f->is_cancelled;
545 1         55 $self->close_now;
546 1         18 return 0;
547             }
548              
549 9         31 $chunk_length = hex( $1 );
550 9 100       50 return 1 if $chunk_length;
551              
552 3         41 return $self->_mk_on_read_chunk_trailer( $req, $on_more, $on_done, $f );
553             }
554              
555             # Chunk is followed by a CRLF, which isn't counted in the length;
556 6 50 33     38 if( defined $chunk_length and length( $$buffref ) >= $chunk_length + 2 ) {
557             # Chunk body
558 6         23 my $chunk = substr( $$buffref, 0, $chunk_length, "" );
559 6         18 $ctx->resp_bytes += length $chunk;
560              
561 6 50       73 unless( $$buffref =~ s/^$CRLF// ) {
562 0         0 $self->debug_printf( "ERROR chunk without CRLF" );
563 0 0       0 $f->fail( "Chunk of size $chunk_length wasn't followed by CRLF", http => undef, $req ) unless $f->is_cancelled;
564 0         0 $self->close;
565             }
566              
567 6         18 $ctx->resp_bytes += $+[0];
568              
569 6         39 undef $chunk_length;
570              
571 6         20 return $on_more->( $chunk );
572             }
573              
574 0 0       0 if( $closed ) {
575 0         0 $self->debug_printf( "ERROR closed" );
576 0 0       0 $f->fail( "Connection closed while awaiting chunk", http => undef, $req ) unless $f->is_cancelled;
577             }
578 0         0 return 0;
579 4         40 };
580             }
581              
582             sub _mk_on_read_chunk_trailer
583             {
584 3     3   8 shift; # $self
585 3         12 my ( undef, $on_more, $on_done ) = @_;
586              
587 3         10 my $trailer = "";
588              
589             sub {
590 3     3   19 my ( $self, $buffref, $closed, $ctx ) = @_;
591              
592 3         11 my $req = $ctx->req;
593 3         20 my $f = $ctx->f;
594              
595 3 50       21 if( $closed ) {
596 0         0 $self->debug_printf( "ERROR closed" );
597 0 0       0 $f->fail( "Connection closed while awaiting chunk trailer", http => undef, $req ) unless $f->is_cancelled;
598             }
599              
600 3 50       71 $$buffref =~ s/^(.*)$CRLF// or return 0;
601 3         14 $trailer .= $1;
602 3         11 $ctx->resp_bytes += $+[0];
603              
604 3 50       26 return 1 if length $1;
605              
606             # TODO: Actually use the trailer
607              
608 3         17 $self->debug_printf( "BODY done [chunked]" );
609 3         14 return $on_done->( $ctx );
610 3         44 };
611             }
612              
613             sub _mk_on_read_length
614             {
615 51     51   87 shift; # $self
616 51         142 my ( $content_length, $on_more, $on_done ) = @_;
617              
618             sub {
619 52     52   375 my ( $self, $buffref, $closed, $ctx ) = @_;
620              
621 52         206 my $req = $ctx->req;
622 52         396 my $f = $ctx->f;
623              
624             # This will truncate it if the server provided too much
625 52         374 my $content = substr( $$buffref, 0, $content_length, "" );
626 52         361 $content_length -= length $content;
627 52         174 $ctx->resp_bytes += length $content;
628              
629 52 50       348 return undef unless $on_more->( $content );
630              
631 52 100       163 if( $content_length == 0 ) {
632 50         173 $self->debug_printf( "BODY done [length]" );
633 50         200 return $on_done->( $ctx );
634             }
635              
636 2 50       5 if( $closed ) {
637 0         0 $self->debug_printf( "ERROR closed" );
638 0 0       0 $f->fail( "Connection closed while awaiting body", http => undef, $req ) unless $f->is_cancelled;
639             }
640 2         5 return 0;
641 51         408 };
642             }
643              
644             sub _mk_on_read_until_eof
645             {
646 11     11   20 shift; # $self
647 11         30 my ( $on_more, $on_done ) = @_;
648              
649             sub {
650 23     23   128 my ( $self, $buffref, $closed, $ctx ) = @_;
651              
652 23         46 my $content = $$buffref;
653 23         43 $$buffref = "";
654 23         55 $ctx->resp_bytes += length $content;
655              
656 23 50       207 return undef unless $on_more->( $content );
657              
658 23 100       77 return 0 unless $closed;
659              
660 11         44 $self->debug_printf( "BODY done [eof]" );
661 11         49 return $on_done->( $ctx );
662 11         82 };
663             }
664              
665             =head1 AUTHOR
666              
667             Paul Evans
668              
669             =cut
670              
671             0x55AA;