File Coverage

blib/lib/Net/Async/HTTP.pm
Criterion Covered Total %
statement 335 376 89.1
branch 127 166 76.5
condition 55 85 64.7
subroutine 56 60 93.3
pod 8 13 61.5
total 581 700 83.0


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2021 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::HTTP;
7              
8 37     37   3775878 use strict;
  37         456  
  37         1103  
9 37     37   190 use warnings;
  37         69  
  37         1141  
10 37     37   467 use v5.10; # //
  37         123  
11 37     37   232 use base qw( IO::Async::Notifier );
  37         65  
  37         20927  
12              
13             our $VERSION = '0.48';
14              
15             our $DEFAULT_UA = "Perl + " . __PACKAGE__ . "/$VERSION";
16             our $DEFAULT_MAXREDIR = 3;
17             our $DEFAULT_MAX_IN_FLIGHT = 4;
18             our $DEFAULT_MAX_CONNS_PER_HOST = $ENV{NET_ASYNC_HTTP_MAXCONNS} // 1;
19              
20 37     37   511394 use Carp;
  37         95  
  37         2669  
21              
22 37     37   19295 use Net::Async::HTTP::Connection;
  37         124  
  37         1422  
23              
24 37     37   20182 use HTTP::Request;
  37         39481  
  37         1066  
25 37     37   18811 use HTTP::Request::Common qw();
  37         79340  
  37         1040  
26 37     37   302 use URI;
  37         760  
  37         1325  
27              
28 37     37   233 use IO::Async::Stream 0.59;
  37         814  
  37         1162  
29 37     37   1018 use IO::Async::Loop 0.59; # ->connect( handle ) ==> $stream
  37         10753  
  37         818  
30              
31 37     37   188 use Future 0.28; # ->set_label
  37         512  
  37         873  
32 37     37   212 use Future::Utils 0.16 qw( repeat );
  37         586  
  37         2725  
33              
34 37         306 use Metrics::Any 0.05 '$metrics',
35             strict => 1,
36 37     37   228 name_prefix => [qw( http client )];
  37         592  
37              
38 37     37   3213 use Scalar::Util qw( blessed reftype );
  37         66  
  37         2044  
39 37     37   226 use Time::HiRes qw( time );
  37         104  
  37         324  
40 37     37   5202 use List::Util 1.29 qw( first pairs pairgrep );
  37         826  
  37         3054  
41 37         3134 use Socket 2.010 qw(
42             SOCK_STREAM IPPROTO_IP IP_TOS
43             IPTOS_LOWDELAY IPTOS_THROUGHPUT IPTOS_RELIABILITY IPTOS_MINCOST
44 37     37   278 );
  37         671  
45              
46 37     37   244 use constant HTTP_PORT => 80;
  37         68  
  37         3037  
47 37     37   247 use constant HTTPS_PORT => 443;
  37         69  
  37         2159  
48              
49 37     37   273 use constant READ_LEN => 64*1024; # 64 KiB
  37         65  
  37         1945  
50 37     37   230 use constant WRITE_LEN => 64*1024; # 64 KiB
  37         89  
  37         2018  
51              
52 37     37   277 use Struct::Dumb 0.07; # equallity operator overloading
  37         799  
  37         234  
53             struct Ready => [qw( future connecting )];
54              
55             =head1 NAME
56              
57             C - use HTTP with C
58              
59             =head1 SYNOPSIS
60              
61             use Future::AsyncAwait;
62              
63             use IO::Async::Loop;
64             use Net::Async::HTTP;
65             use URI;
66              
67             my $loop = IO::Async::Loop->new();
68              
69             my $http = Net::Async::HTTP->new();
70              
71             $loop->add( $http );
72              
73             my $response = await $http->do_request(
74             uri => URI->new( "http://www.cpan.org/" ),
75             );
76              
77             print "Front page of http://www.cpan.org/ is:\n";
78             print $response->as_string;
79              
80             =head1 DESCRIPTION
81              
82             This object class implements an asynchronous HTTP user agent. It sends
83             requests to servers, returning L instances to yield responses when
84             they are received. The object supports multiple concurrent connections to
85             servers, and allows multiple requests in the pipeline to any one connection.
86             Normally, only one such object will be needed per program to support any
87             number of requests.
88              
89             As well as using futures the module also supports a callback-based interface.
90              
91             This module optionally supports SSL connections, if L is
92             installed. If so, SSL can be requested either by passing a URI with the
93             C scheme, or by passing a true value as the C parameter.
94              
95             =head2 Connection Pooling
96              
97             There are three ways in which connections to HTTP server hosts are managed by
98             this object, controlled by the value of C. This
99             controls when new connections are established to servers, as compared to
100             waiting for existing connections to be free, as new requests are made to them.
101              
102             They are:
103              
104             =over 2
105              
106             =item max_connections_per_host = 1
107              
108             This is the default setting. In this mode, there will be one connection per
109             host on which there are active or pending requests. If new requests are made
110             while an existing one is outstanding, they will be queued to wait for it.
111              
112             If pipelining is active on the connection (because both the C option
113             is true and the connection is known to be an HTTP/1.1 server), then requests
114             will be pipelined into the connection awaiting their response. If not, they
115             will be queued awaiting a response to the previous before sending the next.
116              
117             =item max_connections_per_host > 1
118              
119             In this mode, there can be more than one connection per host. If a new request
120             is made, it will try to re-use idle connections if there are any, or if they
121             are all busy it will create a new connection to the host, up to the configured
122             limit.
123              
124             =item max_connections_per_host = 0
125              
126             In this mode, there is no upper limit to the number of connections per host.
127             Every new request will try to reuse an idle connection, or else create a new
128             one if all the existing ones are busy.
129              
130             =back
131              
132             These modes all apply per hostname / server port pair; they do not affect the
133             behaviour of connections made to differing hostnames, or differing ports on
134             the same hostname.
135              
136             =cut
137              
138             $metrics->make_gauge( requests_in_flight =>
139             description => "Count of the number of requests sent that have not yet been completed",
140             # no labels
141             );
142             $metrics->make_counter( requests =>
143             description => "Number of HTTP requests sent",
144             labels => [qw( method )],
145             );
146             $metrics->make_counter( responses =>
147             description => "Number of HTTP responses received",
148             labels => [qw( method code )],
149             );
150             $metrics->make_timer( request_duration =>
151             description => "Duration of time spent waiting for responses",
152             # no labels
153             );
154             $metrics->make_distribution( response_bytes =>
155             name => [qw( response bytes )],
156             description => "The size in bytes of responses received",
157             units => "bytes",
158             # no labels
159             );
160              
161             sub _init
162             {
163 35     35   188418 my $self = shift;
164              
165 35         185 $self->{connections} = {}; # { "$host:$port" } -> [ @connections ]
166              
167 35         104 $self->{read_len} = READ_LEN;
168 35         94 $self->{write_len} = WRITE_LEN;
169              
170 35         88 $self->{max_connections_per_host} = $DEFAULT_MAX_CONNS_PER_HOST;
171              
172 35         108 $self->{ssl_params} = {};
173             }
174              
175             sub _remove_from_loop
176             {
177 5     5   2863 my $self = shift;
178              
179 5         12 foreach my $conn ( map { @$_ } values %{ $self->{connections} } ) {
  6         21  
  5         20  
180 4         26 $conn->close;
181             }
182              
183 5         157 $self->SUPER::_remove_from_loop( @_ );
184             }
185              
186             =head1 PARAMETERS
187              
188             The following named parameters may be passed to C or C:
189              
190             =head2 user_agent => STRING
191              
192             A string to set in the C HTTP header. If not supplied, one will
193             be constructed that declares C and the version number.
194              
195             =head2 headers => ARRAY or HASH
196              
197             I
198              
199             A set of extra headers to apply to every outgoing request. May be specified
200             either as an even-sized array containing key/value pairs, or a hash.
201              
202             Individual header values may be added or changed without replacing the entire
203             set by using the L method and passing a key called C<+headers>:
204              
205             $http->configure( +headers => { One_More => "Key" } );
206              
207             =head2 max_redirects => INT
208              
209             Optional. How many levels of redirection to follow. If not supplied, will
210             default to 3. Give 0 to disable redirection entirely.
211              
212             =head2 max_in_flight => INT
213              
214             Optional. The maximum number of in-flight requests to allow per host when
215             pipelining is enabled and supported on that host. If more requests are made
216             over this limit they will be queued internally by the object and not sent to
217             the server until responses are received. If not supplied, will default to 4.
218             Give 0 to disable the limit entirely.
219              
220             =head2 max_connections_per_host => INT
221              
222             Optional. Controls the maximum number of connections per hostname/server port
223             pair, before requests will be queued awaiting one to be free. Give 0 to
224             disable the limit entirely. See also the L section
225             documented above.
226              
227             Currently, if not supplied it will default to 1. However, it has been found in
228             practice that most programs will raise this limit to something higher, perhaps
229             3 or 4. Therefore, a future version of this module may set a higher value.
230              
231             To test if your application will handle this correctly, you can set a
232             different default by setting an environment variable:
233              
234             $ NET_ASYNC_HTTP_MAXCONNS=3 perl ...
235              
236             =head2 timeout => NUM
237              
238             Optional. How long in seconds to wait before giving up on a request. If not
239             supplied then no default will be applied, and no timeout will take place.
240              
241             =head2 stall_timeout => NUM
242              
243             Optional. How long in seconds to wait after each write or read of data on a
244             socket, before giving up on a request. This may be more useful than
245             C on large-file operations, as it will not time out provided that
246             regular progress is still being made.
247              
248             =head2 proxy_host => STRING
249              
250             =head2 proxy_port => INT
251              
252             Optional. Default values to apply to each C method.
253              
254             =head2 cookie_jar => HTTP::Cookies
255              
256             Optional. A reference to a L object. Will be used to set
257             cookies in requests and store them from responses.
258              
259             =head2 pipeline => BOOL
260              
261             Optional. If false, disables HTTP/1.1-style request pipelining.
262              
263             =head2 close_after_request => BOOL
264              
265             I
266              
267             Optional. If true, will set the C header on outgoing
268             requests and disable pipelining, thus making every request use a new
269             connection.
270              
271             =head2 family => INT
272              
273             =head2 local_host => STRING
274              
275             =head2 local_port => INT
276              
277             =head2 local_addrs => ARRAY
278              
279             =head2 local_addr => HASH or ARRAY
280              
281             Optional. Parameters to pass on to the C method used to connect
282             sockets to HTTP servers. Sets the socket family and local socket address to
283             C to. For more detail, see the documentation in
284             L.
285              
286             =head2 fail_on_error => BOOL
287              
288             Optional. Affects the behaviour of response handling when a C<4xx> or C<5xx>
289             response code is received. When false, these responses will be processed as
290             other responses and yielded as the result of the future, or passed to the
291             C callback. When true, such an error response causes the future
292             to fail, or the C callback to be invoked.
293              
294             The HTTP response and request objects will be passed as well as the code and
295             message, and the failure name will be C.
296              
297             ( $code_message, "http", $response, $request ) = $f->failure
298              
299             $on_error->( "$code $message", $response, $request )
300              
301             =head2 read_len => INT
302              
303             =head2 write_len => INT
304              
305             Optional. Used to set the reading and writing buffer lengths on the underlying
306             C objects that represent connections to the server. If not
307             define, a default of 64 KiB will be used.
308              
309             =head2 ip_tos => INT or STRING
310              
311             Optional. Used to set the C socket option on client sockets. If given,
312             should either be a C constant, or one of the string names
313             C, C, C or C. If undefined or left
314             absent, no option will be set.
315              
316             =head2 decode_content => BOOL
317              
318             Optional. If true, incoming responses that have a recognised
319             C are handled by the module, and decompressed content is
320             passed to the body handling callback or returned in the C. See
321             L below for details of which encoding types are recognised.
322             When this option is enabled, outgoing requests also have the
323             C header added to them if it does not already exist.
324              
325             Currently the default is false, because this behaviour is new, but it may
326             default to true in a later version. Applications which care which behaviour
327             applies should set this to a defined value to ensure it doesn't change.
328              
329             =head2 SSL_*
330              
331             Additionally, any parameters whose names start with C will be stored and
332             passed on requests to perform SSL requests. This simplifies configuration of
333             common SSL parameters.
334              
335             =head2 require_SSL => BOOL
336              
337             Optional. If true, then any attempt to make a request that does not use SSL
338             (either by calling C, or as a result of a redirection) will
339             immediately fail.
340              
341             =head2 SOCKS_*
342              
343             I
344              
345             Additionally, any parameters whose names start with C will be stored
346             and used by L to establish connections via a configured
347             proxy.
348              
349             =cut
350              
351             sub configure
352             {
353 47     47 1 15064 my $self = shift;
354 47         156 my %params = @_;
355              
356 47         216 foreach (qw( user_agent max_redirects max_in_flight max_connections_per_host
357             timeout stall_timeout proxy_host proxy_port cookie_jar pipeline
358             close_after_request family local_host local_port local_addrs local_addr
359             fail_on_error read_len write_len decode_content require_SSL ))
360             {
361 987 100       1762 $self->{$_} = delete $params{$_} if exists $params{$_};
362             }
363              
364             # Always store internally as ARRAyref
365 47 100       236 if( my $headers = delete $params{headers} ) {
366 1 50       11 @{ $self->{headers} } =
  1 50       3  
367             ( ref $headers eq "ARRAY" ) ? @$headers :
368             ( ref $headers eq "HASH" ) ? %$headers :
369             croak "Expected 'headers' to be either ARRAY or HASH reference";
370             }
371              
372 47 100       246 if( my $more = delete $params{"+headers"} ) {
373 1 50       10 my @more =
    50          
374             ( ref $more eq "ARRAY" ) ? @$more :
375             ( ref $more eq "HASH" ) ? %$more :
376             croak "Expected '+headers' to be either ARRAY or HASH reference";
377 1         4 my %to_remove = @more;
378              
379 1         3 my $headers = $self->{headers};
380 1     1   10 @$headers = ( ( pairgrep { !exists $to_remove{$a} } @$headers ), @more );
  1         6  
381             }
382              
383 47         187 foreach ( grep { m/^SSL_/ } keys %params ) {
  2         11  
384 0         0 $self->{ssl_params}{$_} = delete $params{$_};
385             }
386              
387 47         152 foreach ( grep { m/^SOCKS_/ } keys %params ) {
  2         8  
388 0         0 $self->{socks_params}{$_} = delete $params{$_};
389             }
390              
391 47 50       171 if( exists $params{ip_tos} ) {
392             # TODO: This conversion should live in IO::Async somewhere
393 0         0 my $ip_tos = delete $params{ip_tos};
394 0 0 0     0 $ip_tos = IPTOS_LOWDELAY if defined $ip_tos and $ip_tos eq "lowdelay";
395 0 0 0     0 $ip_tos = IPTOS_THROUGHPUT if defined $ip_tos and $ip_tos eq "throughput";
396 0 0 0     0 $ip_tos = IPTOS_RELIABILITY if defined $ip_tos and $ip_tos eq "reliability";
397 0 0 0     0 $ip_tos = IPTOS_MINCOST if defined $ip_tos and $ip_tos eq "mincost";
398 0         0 $self->{ip_tos} = $ip_tos;
399             }
400              
401 47         326 $self->SUPER::configure( %params );
402              
403 47 100       741 defined $self->{user_agent} or $self->{user_agent} = $DEFAULT_UA;
404 47 100       237 defined $self->{max_redirects} or $self->{max_redirects} = $DEFAULT_MAXREDIR;
405 47 100       186 defined $self->{max_in_flight} or $self->{max_in_flight} = $DEFAULT_MAX_IN_FLIGHT;
406 47 100       229 defined $self->{pipeline} or $self->{pipeline} = 1;
407             }
408              
409             =head1 METHODS
410              
411             The following methods documented in an C expression return L
412             instances.
413              
414             When returning a Future, the following methods all indicate HTTP-level errors
415             using the Future failure name of C. If the error relates to a specific
416             response it will be included. The original request is also included.
417              
418             $f->fail( $message, "http", $response, $request )
419              
420             =cut
421              
422             sub connect_connection
423             {
424 100     100 0 213 my $self = shift;
425 100         455 my %args = @_;
426              
427 100         244 my $conn = delete $args{conn};
428              
429 100         299 my $host = delete $args{host};
430 100         208 my $port = delete $args{port};
431              
432 100         193 my $on_error = $args{on_error};
433              
434 100 50       369 if( my $socks_params = $self->{socks_params} ) {
435 0         0 require Net::Async::SOCKS;
436 0         0 Net::Async::SOCKS->VERSION( '0.003' );
437              
438 0         0 unshift @{ $args{extensions} }, "SOCKS";
  0         0  
439 0         0 $args{$_} = $socks_params->{$_} for keys %$socks_params;
440             }
441              
442 100 50       305 if( $args{SSL} ) {
443 0         0 require IO::Async::SSL;
444 0         0 IO::Async::SSL->VERSION( '0.12' ); # 0.12 has ->connect(handle) bugfix
445              
446 0         0 unshift @{ $args{extensions} }, "SSL";
  0         0  
447             }
448              
449             my $f = $conn->connect(
450             host => $host,
451             service => $port,
452             family => ( $args{family} || $self->{family} || 0 ),
453 400 50       1228 ( map { defined $self->{$_} ? ( $_ => $self->{$_} ) : () }
454             qw( local_host local_port local_addrs local_addr ) ),
455              
456             %args,
457             )->on_done( sub {
458 92     92   2215 my ( $stream ) = @_;
459 92         457 $stream->configure(
460             notifier_name => "$host:$port,fd=" . $stream->read_handle->fileno,
461             );
462              
463             # Defend against ->setsockopt doing silly things like detecting SvPOK()
464 92 50       7467 $stream->read_handle->setsockopt( IPPROTO_IP, IP_TOS, $self->{ip_tos}+0 ) if defined $self->{ip_tos};
465              
466 92         320 $stream->ready;
467             })->on_fail( sub {
468 7     7   9832 $on_error->( $conn, "$host:$port - $_[0] failed [$_[-1]]" );
469 100   50     963 });
470              
471 100     100   34827 $f->on_ready( sub { undef $f } ); # intentionally cycle
  100         2007  
472             }
473              
474             sub get_connection
475             {
476 141     141 0 357 my $self = shift;
477 141         590 my %args = @_;
478              
479 141 50       660 my $loop = $self->get_loop or croak "Cannot ->get_connection without a Loop";
480              
481 141         986 my $host = $args{host};
482 141         311 my $port = $args{port};
483              
484 141         410 my $key = "$host:$port";
485 141   100     672 my $conns = $self->{connections}{$key} ||= [];
486 141   100     674 my $ready_queue = $self->{ready_queue}{$key} ||= [];
487              
488             # Have a look to see if there are any idle connected ones first
489 141         399 foreach my $conn ( @$conns ) {
490 51 100 100     192 $conn->is_idle and $conn->read_handle and return Future->done( $conn );
491             }
492              
493 128         326 my $ready = $args{ready};
494 128 100       903 $ready or push @$ready_queue, $ready =
495             Ready( $self->loop->new_future->set_label( "[ready $host:$port]" ), 0 );
496              
497 128         36783 my $f = $ready->future;
498              
499 128         900 my $max = $self->{max_connections_per_host};
500 128 100 100     636 if( $max and @$conns >= $max ) {
501 28         209 return $f;
502             }
503              
504             my $conn = Net::Async::HTTP::Connection->new(
505             notifier_name => "$host:$port,connecting",
506             ready_queue => $ready_queue,
507 400         2427 ( map { $_ => $self->{$_} }
508             qw( max_in_flight read_len write_len decode_content ) ),
509             pipeline => ( $self->{pipeline} && !$self->{close_after_request} ),
510             is_proxy => $args{is_proxy},
511              
512             on_closed => sub {
513 67     67   162 my $conn = shift;
514 67         309 my $http = $conn->parent;
515              
516 67         724 $conn->remove_from_parent;
517 67         8752 @$conns = grep { $_ != $conn } @$conns;
  67         297  
518              
519 67 100       835 if( my $next = first { !$_->connecting } @$ready_queue ) {
  2         11  
520             # Requeue another connection attempt as there's still more to do
521 2         109 $http->get_connection( %args, ready => $next );
522             }
523             },
524 100   100     458 );
525              
526 100         10145 $self->add_child( $conn );
527 100         13502 push @$conns, $conn;
528              
529             $ready->connecting = $self->connect_connection( %args,
530             conn => $conn,
531             on_error => sub {
532 7     7   15 my $conn = shift;
533              
534 7 50       26 $f->fail( @_ ) unless $f->is_cancelled;
535              
536 7         1060 $conn->remove_from_parent;
537 7         860 @$conns = grep { $_ != $conn } @$conns;
  10         32  
538 7         14 @$ready_queue = grep { $_ != $ready } @$ready_queue;
  12         122  
539              
540 7 100       80 if( my $next = first { !$_->connecting } @$ready_queue ) {
  5         17  
541             # Requeue another connection attempt as there's still more to do
542 2         29 $self->get_connection( %args, ready => $next );
543             }
544             },
545             )->on_cancel( sub {
546 1     1   29 $conn->remove_from_parent;
547 1         125 @$conns = grep { $_ != $conn } @$conns;
  2         7  
548 100         863 });
549              
550 100         4449 return $f;
551             }
552              
553             =head2 do_request
554              
555             $response = await $http->do_request( %args );
556              
557             Send an HTTP request to a server, returning a L that will yield the
558             response. The request may be represented by an L object, or a
559             L object, depending on the arguments passed.
560              
561             The following named arguments are used for Cs:
562              
563             =over 8
564              
565             =item request => HTTP::Request
566              
567             A reference to an C object
568              
569             =item host => STRING
570              
571             Hostname of the server to connect to
572              
573             =item port => INT or STRING
574              
575             Optional. Port number or service of the server to connect to. If not defined,
576             will default to C or C depending on whether SSL is being used.
577              
578             =item family => INT
579              
580             Optional. Restricts the socket family for connecting. If not defined, will
581             default to the globally-configured value in the object.
582              
583             =item SSL => BOOL
584              
585             Optional. If true, an SSL connection will be used.
586              
587             =back
588              
589             The following named arguments are used for C requests:
590              
591             =over 8
592              
593             =item uri => URI or STRING
594              
595             A reference to a C object, or a plain string giving the request URI. If
596             the scheme is C then an SSL connection will be used.
597              
598             =item method => STRING
599              
600             Optional. The HTTP method name. If missing, C is used.
601              
602             =item content => STRING or ARRAY ref
603              
604             Optional. The body content to use for C or C requests.
605              
606             If this is a plain scalar it will be used directly, and a C
607             field must also be supplied to describe it.
608              
609             If this is an ARRAY ref and the request method is C, it will be form
610             encoded. It should contain an even-sized list of field names and values. For
611             more detail see L.
612              
613             =item content_type => STRING
614              
615             The type of non-form data C.
616              
617             =item user => STRING
618              
619             =item pass => STRING
620              
621             Optional. If both are given, the HTTP Basic Authorization header will be sent
622             with these details.
623              
624             =item headers => ARRAY|HASH
625              
626             Optional. If provided, contains additional HTTP headers to set on the
627             constructed request object. If provided as an ARRAY reference, it should
628             contain an even-sized list of name/value pairs.
629              
630             =item proxy_host => STRING
631              
632             =item proxy_port => INT
633              
634             Optional. Override the hostname or port number implied by the URI.
635              
636             =back
637              
638             For either request type, it takes the following arguments:
639              
640             =over 8
641              
642             =item request_body => STRING | CODE | Future
643              
644             Optional. Allows request body content to be generated by a future or
645             callback, rather than being provided as part of the C object. This
646             can either be a plain string, a C reference to a generator function, or
647             a future.
648              
649             As this is passed to the underlying L C method, the
650             usual semantics apply here. If passed a C reference, it will be called
651             repeatedly whenever it's safe to write. The code should should return C
652             to indicate completion. If passed a C it is expected to eventually
653             yield the body value.
654              
655             As with the C parameter, the C field should be
656             specified explicitly in the request header, as should the content length
657             (typically via the L C method). See also
658             F.
659              
660             =item expect_continue => BOOL
661              
662             Optional. If true, sets the C request header to the value
663             C<100-continue> and does not send the C parameter until a
664             C<100 Continue> response is received from the server. If an error response is
665             received then the C code, if present, will not be invoked.
666              
667             =item on_ready => CODE
668              
669             Optional. A callback that is invoked once a socket connection is established
670             with the HTTP server, but before the request is actually sent over it. This
671             may be used by the client code to inspect the socket, or perform any other
672             operations on it. This code is expected to return a C; only once that
673             has completed will the request cycle continue. If it fails, that failure is
674             propagated to the caller.
675              
676             $f = $on_ready->( $connection )
677              
678             =item on_redirect => CODE
679              
680             Optional. A callback that is invoked if a redirect response is received,
681             before the new location is fetched. It will be passed the response and the new
682             URL.
683              
684             $on_redirect->( $response, $location )
685              
686             =item on_body_write => CODE
687              
688             Optional. A callback that is invoked after each successful C of the
689             body content. This may be used to implement an upload progress indicator or
690             similar. It will be passed the total number of bytes of body content written
691             so far (i.e. excluding bytes consumed in the header).
692              
693             $on_body_write->( $written )
694              
695             =item max_redirects => INT
696              
697             Optional. How many levels of redirection to follow. If not supplied, will
698             default to the value given in the constructor.
699              
700             =item timeout => NUM
701              
702             =item stall_timeout => NUM
703              
704             Optional. Overrides the object's configured timeout values for this one
705             request. If not specified, will use the configured defaults.
706              
707             On a timeout, the returned future will fail with either C or
708             C as the operation name.
709              
710             ( $message, "timeout" ) = $f->failure
711              
712             =back
713              
714             =head2 do_request (void)
715              
716             $http->do_request( %args )
717              
718             When not returning a future, the following extra arguments are used as
719             callbacks instead:
720              
721             =over 8
722              
723             =item on_response => CODE
724              
725             A callback that is invoked when a response to this request has been received.
726             It will be passed an L object containing the response the
727             server sent.
728              
729             $on_response->( $response )
730              
731             =item on_header => CODE
732              
733             Alternative to C. A callback that is invoked when the header of a
734             response has been received. It is expected to return a C reference for
735             handling chunks of body content. This C reference will be invoked with
736             no arguments once the end of the request has been reached, and whatever it
737             returns will be used as the result of the returned C, if there is one.
738              
739             $on_body_chunk = $on_header->( $header )
740              
741             $on_body_chunk->( $data )
742             $response = $on_body_chunk->()
743              
744             =item on_error => CODE
745              
746             A callback that is invoked if an error occurs while trying to send the request
747             or obtain the response. It will be passed an error message.
748              
749             $on_error->( $message )
750              
751             If this is invoked because of a received C<4xx> or C<5xx> error code in an
752             HTTP response, it will be invoked with the response and request objects as
753             well.
754              
755             $on_error->( $message, $response, $request )
756              
757             =back
758              
759             =cut
760              
761             sub _do_one_request
762             {
763 137     137   1735 my $self = shift;
764 137         843 my %args = @_;
765              
766 137         357 my $host = delete $args{host};
767 137         283 my $port = delete $args{port};
768 137         257 my $request = delete $args{request};
769 137         256 my $SSL = delete $args{SSL};
770              
771 137         533 my $start_time = time;
772 137   66     723 my $stall_timeout = $args{stall_timeout} // $self->{stall_timeout};
773              
774 137         545 $self->prepare_request( $request );
775              
776 137 50 33     1377 if( $self->{require_SSL} and not $SSL ) {
777 0         0 return Future->fail( "Non-SSL request is not allowed with 'require_SSL' set",
778             http => undef, $request );
779             }
780              
781 137 100       709 if( $metrics ) {
782 33         460 $metrics->inc_gauge( requests_in_flight => );
783 33         3492 $metrics->inc_counter( requests => [ method => $request->method ] );
784             }
785              
786             return $self->get_connection(
787             host => $args{proxy_host} || $self->{proxy_host} || $host,
788             port => $args{proxy_port} || $self->{proxy_port} || $port,
789             is_proxy => !!( $args{proxy_host} || $self->{proxy_host} ),
790             ( defined $args{family} ? ( family => $args{family} ) : () ),
791             $SSL ? (
792             SSL => 1,
793             SSL_hostname => $host,
794 0         0 %{ $self->{ssl_params} },
795 0 0       0 ( map { m/^SSL_/ ? ( $_ => $args{$_} ) : () } keys %args ),
796             ) : (),
797             )->then( sub {
798 129     129   9350 my ( $conn ) = @_;
799 129 100       564 $args{on_ready} ? $args{on_ready}->( $conn )->then_done( $conn )
800             : Future->done( $conn )
801             })->then( sub {
802 129     129   11240 my ( $conn ) = @_;
803              
804             return $conn->request(
805             request => $request,
806             stall_timeout => $stall_timeout,
807             %args,
808             $SSL ? ( SSL => 1 ) : (),
809             on_done => sub {
810 115         1407 my ( $ctx ) = @_;
811              
812 115 100       518 if( $metrics ) {
813 1         15 $metrics->dec_gauge( requests_in_flight => );
814             # TODO: Some sort of error counter instead for errors?
815 1         61 $metrics->inc_counter( responses => [ method => $request->method, code => $ctx->resp_header->code ] );
816 1         110 $metrics->report_timer( request_duration => time - $start_time );
817 1         67 $metrics->report_distribution( response_bytes => $ctx->resp_bytes );
818             }
819             },
820 129 50       1217 );
821 137 50 66     5152 } );
    50 66        
      66        
822             }
823              
824             sub _should_redirect
825             {
826 115     115   304 my ( $response ) = @_;
827              
828             # Should only redirect if we actually have a Location header
829 115 100 100     380 return 0 unless $response->is_redirect and defined $response->header( "Location" );
830              
831 9         521 my $req_method = $response->request->method;
832             # Should only redirect GET or HEAD requests
833 9   66     240 return $req_method eq "GET" || $req_method eq "HEAD";
834             }
835              
836             sub _do_request
837             {
838 130     130   245 my $self = shift;
839 130         552 my %args = @_;
840              
841 130         320 my $host = $args{host};
842 130         321 my $port = $args{port};
843 130         546 my $ssl = $args{SSL};
844              
845 130         319 my $on_header = delete $args{on_header};
846              
847 130 100       403 my $redirects = defined $args{max_redirects} ? $args{max_redirects} : $self->{max_redirects};
848              
849 130         221 my $request = $args{request};
850 130         222 my $response;
851             my $reqf;
852             # Defeat prototype
853             my $future = &repeat( $self->_capture_weakself( sub {
854 137     137   8410 my $self = shift;
855 137         314 my ( $previous_f ) = @_;
856              
857 137 100       401 if( $previous_f ) {
858 7         42 my $previous_response = $previous_f->get;
859 7         123 $args{previous_response} = $previous_response;
860              
861 7         26 my $location = $previous_response->header( "Location" );
862              
863 7 100       317 if( $location =~ m{^http(?:s?)://} ) {
    50          
864             # skip
865             }
866             elsif( $location =~ m{^/} ) {
867 3 50       149 my $hostport = ( $port != HTTP_PORT ) ? "$host:$port" : $host;
868 3         17 $location = "http://$hostport" . $location;
869             }
870             else {
871 0         0 return Future->fail( "Unrecognised Location: $location", http => $previous_response, $request );
872             }
873              
874 7         56 my $loc_uri = URI->new( $location );
875 7 50       6671 unless( $loc_uri ) {
876 0         0 return Future->fail( "Unable to parse '$location' as a URI", http => $previous_response, $request );
877             }
878              
879 7         101 $self->debug_printf( "REDIRECT $loc_uri" );
880              
881 7 100       113 $args{on_redirect}->( $previous_response, $location ) if $args{on_redirect};
882              
883 7         53 %args = $self->_make_request_for_uri( $loc_uri, %args );
884 7         24 $request = $args{request};
885              
886 7         15 undef $host; undef $port; undef $ssl;
  7         12  
  7         30  
887             }
888              
889 137         466 my $uri = $request->uri;
890 137 100 66     1365 if( defined $uri->scheme and $uri->scheme =~ m/^http(s?)$/ ) {
891 96 100       4748 $host = $uri->host if !defined $host;
892 96 100       579 $port = $uri->port if !defined $port;
893 96         632 $ssl = ( $uri->scheme eq "https" );
894             }
895              
896 137 50       2597 defined $host or croak "Expected 'host'";
897 137 50       392 defined $port or $port = ( $ssl ? HTTPS_PORT : HTTP_PORT );
    100          
898              
899             return $reqf = $self->_do_one_request(
900             host => $host,
901             port => $port,
902             SSL => $ssl,
903             %args,
904             on_header => $self->_capture_weakself( sub {
905 117         1290 my $self = shift;
906 117         312 ( $response ) = @_;
907              
908             # Consume and discard the entire body of a redirect
909             return sub {
910 9 50       42 return if @_;
911 9         21 return $response;
912 117 100 100     663 } if $redirects and $response->is_redirect;
913              
914 108         1389 return $on_header->( $response );
915 137         1075 } ),
916             );
917             } ),
918             while => sub {
919 130     130   13707 my $f = shift;
920 130 100 66     643 return 0 if $f->failure or $f->is_cancelled;
921 115   100     1916 return _should_redirect( $response ) && $redirects--;
922 130         1376 } );
923              
924 130 100       18561 if( $self->{fail_on_error} ) {
925             $future = $future->then_with_f( sub {
926 3     3   303 my ( $f, $resp ) = @_;
927 3         10 my $code = $resp->code;
928              
929 3 100       41 if( $code =~ m/^[45]/ ) {
930 2         7 my $message = $resp->message;
931 2         21 $message =~ s/\r$//; # HTTP::Message bug
932              
933 2         13 return Future->fail( "$code $message", http => $resp, $request );
934             }
935              
936 1         3 return $f;
937 3         23 });
938             }
939              
940 130         615 return $future;
941             }
942              
943             sub do_request
944             {
945 130     130 1 535216 my $self = shift;
946 130         690 my %args = @_;
947              
948 130 100       1574 if( my $uri = delete $args{uri} ) {
    50          
949 82         971 %args = $self->_make_request_for_uri( $uri, %args );
950             }
951             elsif( !defined $args{request} ) {
952 0         0 croak "Require either 'uri' or 'request' argument";
953             }
954              
955 130 100 66     900 if( $args{on_header} ) {
    50          
956             # ok
957             }
958             elsif( $args{on_response} or defined wantarray ) {
959             $args{on_header} = sub {
960 103     103   252 my ( $response ) = @_;
961             return sub {
962 174 100       437 if( @_ ) {
963 73         420 $response->add_content( @_ );
964             }
965             else {
966 101         336 return $response;
967             }
968 103         631 };
969             }
970 125         791 }
971             else {
972 0         0 croak "Expected 'on_response' or 'on_header' as CODE ref or to return a Future";
973             }
974              
975 130         338 my $on_error = delete $args{on_error};
976 130 100       404 my $timeout = defined $args{timeout} ? $args{timeout} : $self->{timeout};
977              
978 130         623 my $future = $self->_do_request( %args );
979              
980 130 100       444 if( defined $timeout ) {
981             $future = Future->wait_any(
982             $future,
983             $self->loop->timeout_future( after => $timeout )
984 34     4   123 ->transform( fail => sub { "Timed out", timeout => } ),
  4         493827  
985             );
986             }
987              
988             $future->on_done( $self->_capture_weakself( sub {
989 106     106   17496 my $self = shift;
990 106         215 my $response = shift;
991 106         427 $self->process_response( $response );
992 130         37126 } ) );
993              
994             $future->on_fail( sub {
995 8     8   1969 my ( $message, $name, @rest ) = @_;
996 8         32 $on_error->( $message, @rest );
997 130 100       4746 }) if $on_error;
998              
999 130 100       2073 if( my $on_response = delete $args{on_response} ) {
1000             $future->on_done( sub {
1001 74     74   1995 my ( $response ) = @_;
1002 74         268 $on_response->( $response );
1003 81         437 });
1004             }
1005              
1006             # DODGY HACK:
1007             # In void context we'll lose reference on the ->wait_any Future, so the
1008             # timeout logic will never happen. So lets purposely create a cycle by
1009             # capturing the $future in on_done/on_fail closures within itself. This
1010             # conveniently clears them out to drop the ref when done.
1011 130 100       2072 return $future if defined wantarray;
1012              
1013 54     54   291 $future->on_ready( sub { undef $future } );
  54         8990  
1014             }
1015              
1016             sub _make_request_for_uri
1017             {
1018 89     89   173 my $self = shift;
1019 89         305 my ( $uri, %args ) = @_;
1020              
1021 89 100 33     933 if( !ref $uri ) {
    50          
1022 14         93 $uri = URI->new( $uri );
1023             }
1024             elsif( blessed $uri and !$uri->isa( "URI" ) ) {
1025 0         0 croak "Expected 'uri' as a URI reference";
1026             }
1027              
1028 89   100     34267 my $method = delete $args{method} || "GET";
1029              
1030 89         447 $args{host} = $uri->host;
1031 89         4117 $args{port} = $uri->port;
1032              
1033 89         2407 my $request;
1034              
1035 89 100       421 if( $method eq "POST" ) {
1036 2 50       8 defined $args{content} or croak "Expected 'content' with POST method";
1037              
1038             # Lack of content_type didn't used to be a failure condition:
1039             ref $args{content} or defined $args{content_type} or
1040 2 50 66     11 carp "No 'content_type' was given with 'content'";
1041              
1042             # This will automatically encode a form for us
1043 2         10 $request = HTTP::Request::Common::POST( $uri, Content => $args{content}, Content_Type => $args{content_type} );
1044             }
1045             else {
1046 87         1004 $request = HTTP::Request->new( $method, $uri );
1047 87 100       6745 if( defined $args{content} ) {
1048 3 50       12 defined $args{content_type} or carp "No 'content_type' was given with 'content'";
1049              
1050 3         15 $request->content( $args{content} );
1051 3   50     115 $request->content_type( $args{content_type} // "" );
1052             }
1053             }
1054              
1055 89         1291 $request->protocol( "HTTP/1.1" );
1056 89 100       968 if( $args{port} != $uri->default_port ) {
1057 16         163 $request->header( Host => "$args{host}:$args{port}" );
1058             }
1059             else {
1060 73         542 $request->header( Host => "$args{host}" );
1061             }
1062              
1063 89         5760 my $headers = $args{headers};
1064 89 100 100     530 if( $headers and reftype $headers eq "ARRAY" ) {
    100 66        
1065 1         19 $request->header( @$_ ) for pairs @$headers;
1066             }
1067             elsif( $headers and reftype $headers eq "HASH" ) {
1068 1         7 $request->header( $_, $headers->{$_} ) for keys %$headers;
1069             }
1070              
1071 89         416 my ( $user, $pass );
1072              
1073 89 100 66     342 if( defined $uri->userinfo ) {
    100          
1074 1         23 ( $user, $pass ) = split( m/:/, $uri->userinfo, 2 );
1075             }
1076             elsif( defined $args{user} and defined $args{pass} ) {
1077 1         24 $user = $args{user};
1078 1         3 $pass = $args{pass};
1079             }
1080              
1081 89 100 66     2154 if( defined $user and defined $pass ) {
1082 2         15 $request->authorization_basic( $user, $pass );
1083             }
1084              
1085 89         1728 $args{request} = $request;
1086              
1087 89         687 return %args;
1088             }
1089              
1090             =head2 GET, HEAD, PUT, ...
1091              
1092             $response = await $http->GET( $uri, %args );
1093              
1094             $response = await $http->HEAD( $uri, %args );
1095              
1096             $response = await $http->PUT( $uri, $content, %args );
1097              
1098             $response = await $http->POST( $uri, $content, %args );
1099              
1100             $response = await $http->PATCH( $uri, $content, %args );
1101              
1102             Convenient wrappers for performing C, C, C, C or
1103             C requests with a C object and few if any other arguments,
1104             returning a C.
1105              
1106             Remember that C with non-form data (as indicated by a plain scalar
1107             instead of an C reference of form data name/value pairs) needs a
1108             C key in C<%args>.
1109              
1110             =cut
1111              
1112             sub GET
1113             {
1114 8     8 1 3197 my $self = shift;
1115 8         21 my ( $uri, @args ) = @_;
1116 8         35 return $self->do_request( method => "GET", uri => $uri, @args );
1117             }
1118              
1119             sub HEAD
1120             {
1121 0     0 1 0 my $self = shift;
1122 0         0 my ( $uri, @args ) = @_;
1123 0         0 return $self->do_request( method => "HEAD", uri => $uri, @args );
1124             }
1125              
1126             sub PUT
1127             {
1128 0     0 1 0 my $self = shift;
1129 0         0 my ( $uri, $content, @args ) = @_;
1130 0         0 return $self->do_request( method => "PUT", uri => $uri, content => $content, @args );
1131             }
1132              
1133             sub POST
1134             {
1135 0     0 0 0 my $self = shift;
1136 0         0 my ( $uri, $content, @args ) = @_;
1137 0         0 return $self->do_request( method => "POST", uri => $uri, content => $content, @args );
1138             }
1139              
1140             sub PATCH
1141             {
1142 0     0 0 0 my $self = shift;
1143 0         0 my ( $uri, $content, @args ) = @_;
1144 0         0 return $self->do_request( method => "PATCH", uri => $uri, content => $content, @args );
1145             }
1146              
1147             =head1 SUBCLASS METHODS
1148              
1149             The following methods are intended as points for subclasses to override, to
1150             add extra functionallity.
1151              
1152             =cut
1153              
1154             =head2 prepare_request
1155              
1156             $http->prepare_request( $request )
1157              
1158             Called just before the C object is sent to the server.
1159              
1160             =cut
1161              
1162             sub prepare_request
1163             {
1164 137     137 1 270 my $self = shift;
1165 137         422 my ( $request ) = @_;
1166              
1167 137 100       662 $request->init_header( 'User-Agent' => $self->{user_agent} ) if length $self->{user_agent};
1168 137 100       1969 if( $self->{close_after_request} ) {
1169 1         5 $request->header( "Connection" => "close" );
1170             }
1171             else {
1172 136         793 $request->init_header( "Connection" => "keep-alive" );
1173             }
1174              
1175 137         6610 foreach ( pairs @{ $self->{headers} } ) {
  137         1305  
1176 3         70 $request->init_header( $_->key, $_->value );
1177             }
1178              
1179 137 100       638 $self->{cookie_jar}->add_cookie_header( $request ) if $self->{cookie_jar};
1180             }
1181              
1182             =head2 process_response
1183              
1184             $http->process_response( $response )
1185              
1186             Called after a non-redirect C has been received from a server.
1187             The originating request will be set in the object.
1188              
1189             =cut
1190              
1191             sub process_response
1192             {
1193 106     106 1 215 my $self = shift;
1194 106         234 my ( $response ) = @_;
1195              
1196 106 100       598 $self->{cookie_jar}->extract_cookies( $response ) if $self->{cookie_jar};
1197             }
1198              
1199             =head1 CONTENT DECODING
1200              
1201             If the required decompression modules are installed and available, compressed
1202             content can be decoded. If the received C is recognised and
1203             the required module is available, the content is transparently decoded and the
1204             decoded content is returned in the resulting response object, or passed to the
1205             data chunk handler. In this case, the original C header will
1206             be deleted from the response, and its value will be available instead as
1207             C.
1208              
1209             The following content encoding types are recognised by these modules:
1210              
1211             =over 4
1212              
1213             =cut
1214              
1215             =item * gzip (q=0.7) and deflate (q=0.5)
1216              
1217             Recognised if L version 2.057 or newer is installed.
1218              
1219             =cut
1220              
1221             if( eval { require Compress::Raw::Zlib and $Compress::Raw::Zlib::VERSION >= 2.057 } ) {
1222             my $make_zlib_decoder = sub {
1223             my ( $bits ) = @_;
1224             my $inflator = Compress::Raw::Zlib::Inflate->new(
1225             -ConsumeInput => 0,
1226             -WindowBits => $bits,
1227             );
1228             sub {
1229             my $output;
1230             my $status = @_ ? $inflator->inflate( $_[0], $output )
1231             : $inflator->inflate( "", $output, 1 );
1232             die "$status\n" if $status && $status != Compress::Raw::Zlib::Z_STREAM_END();
1233             return $output;
1234             };
1235             };
1236              
1237             # RFC1950
1238             __PACKAGE__->register_decoder(
1239             deflate => 0.5, sub { $make_zlib_decoder->( 15 ) },
1240             );
1241              
1242             # RFC1952
1243             __PACKAGE__->register_decoder(
1244             gzip => 0.7, sub { $make_zlib_decoder->( Compress::Raw::Zlib::WANT_GZIP() ) },
1245             );
1246             }
1247              
1248             =item * bzip2 (q=0.8)
1249              
1250             Recognised if L version 2.10 or newer is installed.
1251              
1252             =cut
1253              
1254             if( eval { require Compress::Bzip2 and $Compress::Bzip2::VERSION >= 2.10 } ) {
1255             __PACKAGE__->register_decoder(
1256             bzip2 => 0.8, sub {
1257             my $inflator = Compress::Bzip2::inflateInit();
1258             sub {
1259             return unless my ( $in ) = @_;
1260             my $out = $inflator->bzinflate( \$in );
1261             die $inflator->bzerror."\n" if !defined $out;
1262             return $out;
1263             };
1264             }
1265             );
1266             }
1267              
1268             =back
1269              
1270             Other content encoding types can be registered by calling the following method
1271              
1272             =head2 register_decoder
1273              
1274             Net::Async::HTTP->register_decoder( $name, $q, $make_decoder )
1275              
1276             Registers an encoding type called C<$name>, at the quality value C<$q>. In
1277             order to decode this encoding type, C<$make_decoder> will be invoked with no
1278             paramters, and expected to return a CODE reference to perform one instance of
1279             decoding.
1280              
1281             $decoder = $make_decoder->()
1282              
1283             This decoder will be invoked on string buffers to decode them until
1284             the end of stream is reached, when it will be invoked with no arguments.
1285              
1286             $content = $decoder->( $encoded_content )
1287             $content = $decoder->() # EOS
1288              
1289             =cut
1290              
1291             {
1292             my %DECODERS; # {$name} = [$q, $make_decoder]
1293              
1294             sub register_decoder
1295             {
1296 74     74 1 143 shift;
1297 74         181 my ( $name, $q, $make_decoder ) = @_;
1298              
1299 74         222 $DECODERS{$name} = [ $q, $make_decoder ];
1300             }
1301              
1302             sub can_decode
1303             {
1304 2     2 0 6 shift;
1305 2 50       7 if( @_ ) {
1306 2         6 my ( $name ) = @_;
1307              
1308 2 50       67 return unless my $d = $DECODERS{$name};
1309 2         10 return $d->[1]->();
1310             }
1311             else {
1312 0           my @ds = sort { $DECODERS{$b}[0] <=> $DECODERS{$a}[0] } keys %DECODERS;
  0            
1313 0           return join ", ", map { "$_;q=$DECODERS{$_}[0]" } @ds;
  0            
1314             }
1315             }
1316             }
1317              
1318             =head1 EXAMPLES
1319              
1320             =head2 Concurrent GET
1321              
1322             The C-returning C method makes it easy to await multiple URLs at
1323             once, by using the L C utility
1324              
1325             use Future::AsyncAwait;
1326             use Future::Utils qw( fmap_void );
1327              
1328             my @URLs = ( ... );
1329              
1330             my $http = Net::Async::HTTP->new( ... );
1331             $loop->add( $http );
1332              
1333             my $future = fmap_void {
1334             my ( $url ) = @_;
1335             $http->GET( $url )
1336             ->on_done( sub {
1337             my $response = shift;
1338             say "$url succeeded: ", $response->code;
1339             say " Content-Type:", $response->content_type;
1340             } )
1341             ->on_fail( sub {
1342             my $failure = shift;
1343             say "$url failed: $failure";
1344             } );
1345             } foreach => \@URLs,
1346             concurrent => 5;
1347              
1348             await $future;
1349              
1350             =cut
1351              
1352             =head1 SEE ALSO
1353              
1354             =over 4
1355              
1356             =item *
1357              
1358             L - Hypertext Transfer Protocol -- HTTP/1.1
1359              
1360             =back
1361              
1362             =head1 SPONSORS
1363              
1364             Parts of this code, or bugfixes to it were paid for by
1365              
1366             =over 2
1367              
1368             =item *
1369              
1370             SocialFlow L
1371              
1372             =item *
1373              
1374             Shadowcat Systems L
1375              
1376             =item *
1377              
1378             NET-A-PORTER L
1379              
1380             =item *
1381              
1382             Cisco L
1383              
1384             =back
1385              
1386             =head1 AUTHOR
1387              
1388             Paul Evans
1389              
1390             =cut
1391              
1392             0x55AA;