File Coverage

blib/lib/Net/Async/HTTP.pm
Criterion Covered Total %
statement 333 371 89.7
branch 125 164 76.2
condition 55 85 64.7
subroutine 56 59 94.9
pod 9 12 75.0
total 578 691 83.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2019 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::HTTP;
7              
8 37     37   3713801 use strict;
  37         427  
  37         1098  
9 37     37   228 use warnings;
  37         65  
  37         869  
10 37     37   692 use 5.010; # //
  37         131  
11 37     37   211 use base qw( IO::Async::Notifier );
  37         64  
  37         20426  
12              
13             our $VERSION = '0.47';
14              
15             our $DEFAULT_UA = "Perl + " . __PACKAGE__ . "/$VERSION";
16             our $DEFAULT_MAXREDIR = 3;
17             our $DEFAULT_MAX_IN_FLIGHT = 4;
18             our $DEFAULT_MAX_CONNS_PER_HOST = $ENV{NET_ASYNC_HTTP_MAXCONNS} // 1;
19              
20 37     37   517406 use Carp;
  37         84  
  37         1939  
21              
22 37     37   17201 use Net::Async::HTTP::Connection;
  37         135  
  37         1289  
23              
24 37     37   17686 use HTTP::Request;
  37         38940  
  37         1032  
25 37     37   17456 use HTTP::Request::Common qw();
  37         79153  
  37         1386  
26 37     37   250 use URI;
  37         468  
  37         1319  
27              
28 37     37   228 use IO::Async::Stream 0.59;
  37         644  
  37         1052  
29 37     37   864 use IO::Async::Loop 0.59; # ->connect( handle ) ==> $stream
  37         8962  
  37         875  
30              
31 37     37   198 use Future 0.28; # ->set_label
  37         521  
  37         928  
32 37     37   205 use Future::Utils 0.16 qw( repeat );
  37         604  
  37         2390  
33              
34 37         337 use Metrics::Any 0.05 '$metrics',
35             strict => 1,
36 37     37   237 name_prefix => [qw( http client )];
  37         699  
37              
38 37     37   3229 use Scalar::Util qw( blessed reftype );
  37         74  
  37         2185  
39 37     37   254 use Time::HiRes qw( time );
  37         111  
  37         376  
40 37     37   5066 use List::Util 1.29 qw( first pairs pairgrep );
  37         818  
  37         3306  
41 37         3021 use Socket 2.010 qw(
42             SOCK_STREAM IPPROTO_IP IP_TOS
43             IPTOS_LOWDELAY IPTOS_THROUGHPUT IPTOS_RELIABILITY IPTOS_MINCOST
44 37     37   278 );
  37         793  
45              
46 37     37   250 use constant HTTP_PORT => 80;
  37         71  
  37         2255  
47 37     37   253 use constant HTTPS_PORT => 443;
  37         78  
  37         2171  
48              
49 37     37   269 use constant READ_LEN => 64*1024; # 64 KiB
  37         86  
  37         2162  
50 37     37   234 use constant WRITE_LEN => 64*1024; # 64 KiB
  37         108  
  37         2111  
51              
52 37     37   254 use Struct::Dumb 0.07; # equallity operator overloading
  37         750  
  37         210  
53             struct Ready => [qw( future connecting )];
54              
55             =head1 NAME
56              
57             C - use HTTP with C
58              
59             =head1 SYNOPSIS
60              
61             use IO::Async::Loop;
62             use Net::Async::HTTP;
63             use URI;
64              
65             my $loop = IO::Async::Loop->new();
66              
67             my $http = Net::Async::HTTP->new();
68              
69             $loop->add( $http );
70              
71             my ( $response ) = $http->do_request(
72             uri => URI->new( "http://www.cpan.org/" ),
73             )->get;
74              
75             print "Front page of http://www.cpan.org/ is:\n";
76             print $response->as_string;
77              
78             =head1 DESCRIPTION
79              
80             This object class implements an asynchronous HTTP user agent. It sends
81             requests to servers, returning L instances to yield responses when
82             they are received. The object supports multiple concurrent connections to
83             servers, and allows multiple requests in the pipeline to any one connection.
84             Normally, only one such object will be needed per program to support any
85             number of requests.
86              
87             As well as using futures the module also supports a callback-based interface.
88              
89             This module optionally supports SSL connections, if L is
90             installed. If so, SSL can be requested either by passing a URI with the
91             C scheme, or by passing a true value as the C parameter.
92              
93             =head2 Connection Pooling
94              
95             There are three ways in which connections to HTTP server hosts are managed by
96             this object, controlled by the value of C. This
97             controls when new connections are established to servers, as compared to
98             waiting for existing connections to be free, as new requests are made to them.
99              
100             They are:
101              
102             =over 2
103              
104             =item max_connections_per_host = 1
105              
106             This is the default setting. In this mode, there will be one connection per
107             host on which there are active or pending requests. If new requests are made
108             while an existing one is outstanding, they will be queued to wait for it.
109              
110             If pipelining is active on the connection (because both the C option
111             is true and the connection is known to be an HTTP/1.1 server), then requests
112             will be pipelined into the connection awaiting their response. If not, they
113             will be queued awaiting a response to the previous before sending the next.
114              
115             =item max_connections_per_host > 1
116              
117             In this mode, there can be more than one connection per host. If a new request
118             is made, it will try to re-use idle connections if there are any, or if they
119             are all busy it will create a new connection to the host, up to the configured
120             limit.
121              
122             =item max_connections_per_host = 0
123              
124             In this mode, there is no upper limit to the number of connections per host.
125             Every new request will try to reuse an idle connection, or else create a new
126             one if all the existing ones are busy.
127              
128             =back
129              
130             These modes all apply per hostname / server port pair; they do not affect the
131             behaviour of connections made to differing hostnames, or differing ports on
132             the same hostname.
133              
134             =cut
135              
136             $metrics->make_gauge( requests_in_flight =>
137             description => "Count of the number of requests sent that have not yet been completed",
138             # no labels
139             );
140             $metrics->make_counter( requests =>
141             description => "Number of HTTP requests sent",
142             labels => [qw( method )],
143             );
144             $metrics->make_counter( responses =>
145             description => "Number of HTTP responses received",
146             labels => [qw( method code )],
147             );
148             $metrics->make_timer( request_duration =>
149             description => "Duration of time spent waiting for responses",
150             # no labels
151             );
152             $metrics->make_distribution( response_bytes =>
153             name => [qw( response bytes )],
154             description => "The size in bytes of responses received",
155             units => "bytes",
156             # no labels
157             );
158              
159             sub _init
160             {
161 35     35   177651 my $self = shift;
162              
163 35         179 $self->{connections} = {}; # { "$host:$port" } -> [ @connections ]
164              
165 35         103 $self->{read_len} = READ_LEN;
166 35         86 $self->{write_len} = WRITE_LEN;
167              
168 35         81 $self->{max_connections_per_host} = $DEFAULT_MAX_CONNS_PER_HOST;
169              
170 35         108 $self->{ssl_params} = {};
171             }
172              
173             sub _remove_from_loop
174             {
175 5     5   3777 my $self = shift;
176              
177 5         12 foreach my $conn ( map { @$_ } values %{ $self->{connections} } ) {
  6         21  
  5         21  
178 4         18 $conn->close;
179             }
180              
181 5         135 $self->SUPER::_remove_from_loop( @_ );
182             }
183              
184             =head1 PARAMETERS
185              
186             The following named parameters may be passed to C or C:
187              
188             =head2 user_agent => STRING
189              
190             A string to set in the C HTTP header. If not supplied, one will
191             be constructed that declares C and the version number.
192              
193             =head2 headers => ARRAY or HASH
194              
195             I
196              
197             A set of extra headers to apply to every outgoing request. May be specified
198             either as an even-sized array containing key/value pairs, or a hash.
199              
200             Individual header values may be added or changed without replacing the entire
201             set by using the L method and passing a key called C<+headers>:
202              
203             $http->configure( +headers => { One_More => "Key" } );
204              
205             =head2 max_redirects => INT
206              
207             Optional. How many levels of redirection to follow. If not supplied, will
208             default to 3. Give 0 to disable redirection entirely.
209              
210             =head2 max_in_flight => INT
211              
212             Optional. The maximum number of in-flight requests to allow per host when
213             pipelining is enabled and supported on that host. If more requests are made
214             over this limit they will be queued internally by the object and not sent to
215             the server until responses are received. If not supplied, will default to 4.
216             Give 0 to disable the limit entirely.
217              
218             =head2 max_connections_per_host => INT
219              
220             Optional. Controls the maximum number of connections per hostname/server port
221             pair, before requests will be queued awaiting one to be free. Give 0 to
222             disable the limit entirely. See also the L section
223             documented above.
224              
225             Currently, if not supplied it will default to 1. However, it has been found in
226             practice that most programs will raise this limit to something higher, perhaps
227             3 or 4. Therefore, a future version of this module may set a higher value.
228              
229             To test if your application will handle this correctly, you can set a
230             different default by setting an environment variable:
231              
232             $ NET_ASYNC_HTTP_MAXCONNS=3 perl ...
233              
234             =head2 timeout => NUM
235              
236             Optional. How long in seconds to wait before giving up on a request. If not
237             supplied then no default will be applied, and no timeout will take place.
238              
239             =head2 stall_timeout => NUM
240              
241             Optional. How long in seconds to wait after each write or read of data on a
242             socket, before giving up on a request. This may be more useful than
243             C on large-file operations, as it will not time out provided that
244             regular progress is still being made.
245              
246             =head2 proxy_host => STRING
247              
248             =head2 proxy_port => INT
249              
250             Optional. Default values to apply to each C method.
251              
252             =head2 cookie_jar => HTTP::Cookies
253              
254             Optional. A reference to a L object. Will be used to set
255             cookies in requests and store them from responses.
256              
257             =head2 pipeline => BOOL
258              
259             Optional. If false, disables HTTP/1.1-style request pipelining.
260              
261             =head2 close_after_request => BOOL
262              
263             I
264              
265             Optional. If true, will set the C header on outgoing
266             requests and disable pipelining, thus making every request use a new
267             connection.
268              
269             =head2 family => INT
270              
271             =head2 local_host => STRING
272              
273             =head2 local_port => INT
274              
275             =head2 local_addrs => ARRAY
276              
277             =head2 local_addr => HASH or ARRAY
278              
279             Optional. Parameters to pass on to the C method used to connect
280             sockets to HTTP servers. Sets the socket family and local socket address to
281             C to. For more detail, see the documentation in
282             L.
283              
284             =head2 fail_on_error => BOOL
285              
286             Optional. Affects the behaviour of response handling when a C<4xx> or C<5xx>
287             response code is received. When false, these responses will be processed as
288             other responses and yielded as the result of the future, or passed to the
289             C callback. When true, such an error response causes the future
290             to fail, or the C callback to be invoked.
291              
292             The HTTP response and request objects will be passed as well as the code and
293             message, and the failure name will be C.
294              
295             ( $code_message, "http", $response, $request ) = $f->failure
296              
297             $on_error->( "$code $message", $response, $request )
298              
299             =head2 read_len => INT
300              
301             =head2 write_len => INT
302              
303             Optional. Used to set the reading and writing buffer lengths on the underlying
304             C objects that represent connections to the server. If not
305             define, a default of 64 KiB will be used.
306              
307             =head2 ip_tos => INT or STRING
308              
309             Optional. Used to set the C socket option on client sockets. If given,
310             should either be a C constant, or one of the string names
311             C, C, C or C. If undefined or left
312             absent, no option will be set.
313              
314             =head2 decode_content => BOOL
315              
316             Optional. If true, incoming responses that have a recognised
317             C are handled by the module, and decompressed content is
318             passed to the body handling callback or returned in the C. See
319             L below for details of which encoding types are recognised.
320             When this option is enabled, outgoing requests also have the
321             C header added to them if it does not already exist.
322              
323             Currently the default is false, because this behaviour is new, but it may
324             default to true in a later version. Applications which care which behaviour
325             applies should set this to a defined value to ensure it doesn't change.
326              
327             =head2 SSL_*
328              
329             Additionally, any parameters whose names start with C will be stored and
330             passed on requests to perform SSL requests. This simplifies configuration of
331             common SSL parameters.
332              
333             =head2 require_SSL => BOOL
334              
335             Optional. If true, then any attempt to make a request that does not use SSL
336             (either by calling C, or as a result of a redirection) will
337             immediately fail.
338              
339             =head2 SOCKS_*
340              
341             I
342              
343             Additionally, any parameters whose names start with C will be stored
344             and used by L to establish connections via a configured
345             proxy.
346              
347             =cut
348              
349             sub configure
350             {
351 47     47 1 12377 my $self = shift;
352 47         149 my %params = @_;
353              
354 47         207 foreach (qw( user_agent max_redirects max_in_flight max_connections_per_host
355             timeout stall_timeout proxy_host proxy_port cookie_jar pipeline
356             close_after_request family local_host local_port local_addrs local_addr
357             fail_on_error read_len write_len decode_content require_SSL ))
358             {
359 987 100       1786 $self->{$_} = delete $params{$_} if exists $params{$_};
360             }
361              
362             # Always store internally as ARRAyref
363 47 100       180 if( my $headers = delete $params{headers} ) {
364 1 50       15 @{ $self->{headers} } =
  1 50       3  
365             ( ref $headers eq "ARRAY" ) ? @$headers :
366             ( ref $headers eq "HASH" ) ? %$headers :
367             croak "Expected 'headers' to be either ARRAY or HASH reference";
368             }
369              
370 47 100       210 if( my $more = delete $params{"+headers"} ) {
371 1 50       9 my @more =
    50          
372             ( ref $more eq "ARRAY" ) ? @$more :
373             ( ref $more eq "HASH" ) ? %$more :
374             croak "Expected '+headers' to be either ARRAY or HASH reference";
375 1         3 my %to_remove = @more;
376              
377 1         2 my $headers = $self->{headers};
378 1     1   8 @$headers = ( ( pairgrep { !exists $to_remove{$a} } @$headers ), @more );
  1         4  
379             }
380              
381 47         177 foreach ( grep { m/^SSL_/ } keys %params ) {
  2         13  
382 0         0 $self->{ssl_params}{$_} = delete $params{$_};
383             }
384              
385 47         147 foreach ( grep { m/^SOCKS_/ } keys %params ) {
  2         8  
386 0         0 $self->{socks_params}{$_} = delete $params{$_};
387             }
388              
389 47 50       150 if( exists $params{ip_tos} ) {
390             # TODO: This conversion should live in IO::Async somewhere
391 0         0 my $ip_tos = delete $params{ip_tos};
392 0 0 0     0 $ip_tos = IPTOS_LOWDELAY if defined $ip_tos and $ip_tos eq "lowdelay";
393 0 0 0     0 $ip_tos = IPTOS_THROUGHPUT if defined $ip_tos and $ip_tos eq "throughput";
394 0 0 0     0 $ip_tos = IPTOS_RELIABILITY if defined $ip_tos and $ip_tos eq "reliability";
395 0 0 0     0 $ip_tos = IPTOS_MINCOST if defined $ip_tos and $ip_tos eq "mincost";
396 0         0 $self->{ip_tos} = $ip_tos;
397             }
398              
399 47         278 $self->SUPER::configure( %params );
400              
401 47 100       713 defined $self->{user_agent} or $self->{user_agent} = $DEFAULT_UA;
402 47 100       172 defined $self->{max_redirects} or $self->{max_redirects} = $DEFAULT_MAXREDIR;
403 47 100       167 defined $self->{max_in_flight} or $self->{max_in_flight} = $DEFAULT_MAX_IN_FLIGHT;
404 47 100       191 defined $self->{pipeline} or $self->{pipeline} = 1;
405             }
406              
407             =head1 METHODS
408              
409             The following methods documented with a trailing call to C<< ->get >> return
410             L instances.
411              
412             When returning a Future, the following methods all indicate HTTP-level errors
413             using the Future failure name of C. If the error relates to a specific
414             response it will be included. The original request is also included.
415              
416             $f->fail( $message, "http", $response, $request )
417              
418             =cut
419              
420             sub connect_connection
421             {
422 98     98 0 201 my $self = shift;
423 98         433 my %args = @_;
424              
425 98         228 my $conn = delete $args{conn};
426              
427 98         216 my $host = delete $args{host};
428 98         187 my $port = delete $args{port};
429              
430 98         180 my $on_error = $args{on_error};
431              
432 98 50       302 if( my $socks_params = $self->{socks_params} ) {
433 0         0 require Net::Async::SOCKS;
434 0         0 Net::Async::SOCKS->VERSION( '0.003' );
435              
436 0         0 unshift @{ $args{extensions} }, "SOCKS";
  0         0  
437 0         0 $args{$_} = $socks_params->{$_} for keys %$socks_params;
438             }
439              
440 98 50       284 if( $args{SSL} ) {
441 0         0 require IO::Async::SSL;
442 0         0 IO::Async::SSL->VERSION( '0.12' ); # 0.12 has ->connect(handle) bugfix
443              
444 0         0 unshift @{ $args{extensions} }, "SSL";
  0         0  
445             }
446              
447             my $f = $conn->connect(
448             host => $host,
449             service => $port,
450             family => ( $args{family} || $self->{family} || 0 ),
451 392 50       1198 ( map { defined $self->{$_} ? ( $_ => $self->{$_} ) : () }
452             qw( local_host local_port local_addrs local_addr ) ),
453              
454             %args,
455             )->on_done( sub {
456 90     90   2116 my ( $stream ) = @_;
457 90         463 $stream->configure(
458             notifier_name => "$host:$port,fd=" . $stream->read_handle->fileno,
459             );
460              
461             # Defend against ->setsockopt doing silly things like detecting SvPOK()
462 90 50       7077 $stream->read_handle->setsockopt( IPPROTO_IP, IP_TOS, $self->{ip_tos}+0 ) if defined $self->{ip_tos};
463              
464 90         307 $stream->ready;
465             })->on_fail( sub {
466 7     7   11222 $on_error->( $conn, "$host:$port - $_[0] failed [$_[-1]]" );
467 98   50     903 });
468              
469 98     98   34567 $f->on_ready( sub { undef $f } ); # intentionally cycle
  98         1946  
470             }
471              
472             sub get_connection
473             {
474 139     139 0 271 my $self = shift;
475 139         582 my %args = @_;
476              
477 139 50       755 my $loop = $self->get_loop or croak "Cannot ->get_connection without a Loop";
478              
479 139         946 my $host = $args{host};
480 139         245 my $port = $args{port};
481              
482 139         381 my $key = "$host:$port";
483 139   100     610 my $conns = $self->{connections}{$key} ||= [];
484 139   100     587 my $ready_queue = $self->{ready_queue}{$key} ||= [];
485              
486             # Have a look to see if there are any idle connected ones first
487 139         360 foreach my $conn ( @$conns ) {
488 51 100 100     185 $conn->is_idle and $conn->read_handle and return Future->done( $conn );
489             }
490              
491 126         280 my $ready = $args{ready};
492 126 100       537 $ready or push @$ready_queue, $ready =
493             Ready( $self->loop->new_future->set_label( "[ready $host:$port]" ), 0 );
494              
495 126         31355 my $f = $ready->future;
496              
497 126         861 my $max = $self->{max_connections_per_host};
498 126 100 100     608 if( $max and @$conns >= $max ) {
499 28         209 return $f;
500             }
501              
502             my $conn = Net::Async::HTTP::Connection->new(
503             notifier_name => "$host:$port,connecting",
504             ready_queue => $ready_queue,
505 392         2229 ( map { $_ => $self->{$_} }
506             qw( max_in_flight read_len write_len decode_content ) ),
507             pipeline => ( $self->{pipeline} && !$self->{close_after_request} ),
508             is_proxy => $args{is_proxy},
509              
510             on_closed => sub {
511 65     65   242 my $conn = shift;
512 65         229 my $http = $conn->parent;
513              
514 65         595 $conn->remove_from_parent;
515 65         7455 @$conns = grep { $_ != $conn } @$conns;
  65         298  
516              
517 65 100       698 if( my $next = first { !$_->connecting } @$ready_queue ) {
  2         13  
518             # Requeue another connection attempt as there's still more to do
519 2         193 $http->get_connection( %args, ready => $next );
520             }
521             },
522 98   100     370 );
523              
524 98         9442 $self->add_child( $conn );
525 98         11864 push @$conns, $conn;
526              
527             $ready->connecting = $self->connect_connection( %args,
528             conn => $conn,
529             on_error => sub {
530 7     7   19 my $conn = shift;
531              
532 7 50       28 $f->fail( @_ ) unless $f->is_cancelled;
533              
534 7         1042 $conn->remove_from_parent;
535 7         942 @$conns = grep { $_ != $conn } @$conns;
  10         36  
536 7         17 @$ready_queue = grep { $_ != $ready } @$ready_queue;
  12         182  
537              
538 7 100       98 if( my $next = first { !$_->connecting } @$ready_queue ) {
  5         23  
539             # Requeue another connection attempt as there's still more to do
540 2         33 $self->get_connection( %args, ready => $next );
541             }
542             },
543             )->on_cancel( sub {
544 1     1   39 $conn->remove_from_parent;
545 1         147 @$conns = grep { $_ != $conn } @$conns;
  2         7  
546 98         884 });
547              
548 98         4005 return $f;
549             }
550              
551             =head2 $response = $http->do_request( %args )->get
552              
553             Send an HTTP request to a server, returning a L that will yield the
554             response. The request may be represented by an L object, or a
555             L object, depending on the arguments passed.
556              
557             The following named arguments are used for Cs:
558              
559             =over 8
560              
561             =item request => HTTP::Request
562              
563             A reference to an C object
564              
565             =item host => STRING
566              
567             Hostname of the server to connect to
568              
569             =item port => INT or STRING
570              
571             Optional. Port number or service of the server to connect to. If not defined,
572             will default to C or C depending on whether SSL is being used.
573              
574             =item family => INT
575              
576             Optional. Restricts the socket family for connecting. If not defined, will
577             default to the globally-configured value in the object.
578              
579             =item SSL => BOOL
580              
581             Optional. If true, an SSL connection will be used.
582              
583             =back
584              
585             The following named arguments are used for C requests:
586              
587             =over 8
588              
589             =item uri => URI or STRING
590              
591             A reference to a C object, or a plain string giving the request URI. If
592             the scheme is C then an SSL connection will be used.
593              
594             =item method => STRING
595              
596             Optional. The HTTP method name. If missing, C is used.
597              
598             =item content => STRING or ARRAY ref
599              
600             Optional. The body content to use for C or C requests.
601              
602             If this is a plain scalar it will be used directly, and a C
603             field must also be supplied to describe it.
604              
605             If this is an ARRAY ref and the request method is C, it will be form
606             encoded. It should contain an even-sized list of field names and values. For
607             more detail see L.
608              
609             =item content_type => STRING
610              
611             The type of non-form data C.
612              
613             =item user => STRING
614              
615             =item pass => STRING
616              
617             Optional. If both are given, the HTTP Basic Authorization header will be sent
618             with these details.
619              
620             =item headers => ARRAY|HASH
621              
622             Optional. If provided, contains additional HTTP headers to set on the
623             constructed request object. If provided as an ARRAY reference, it should
624             contain an even-sized list of name/value pairs.
625              
626             =item proxy_host => STRING
627              
628             =item proxy_port => INT
629              
630             Optional. Override the hostname or port number implied by the URI.
631              
632             =back
633              
634             For either request type, it takes the following arguments:
635              
636             =over 8
637              
638             =item request_body => STRING | CODE | Future
639              
640             Optional. Allows request body content to be generated by a future or
641             callback, rather than being provided as part of the C object. This
642             can either be a plain string, a C reference to a generator function, or
643             a future.
644              
645             As this is passed to the underlying L C method, the
646             usual semantics apply here. If passed a C reference, it will be called
647             repeatedly whenever it's safe to write. The code should should return C
648             to indicate completion. If passed a C it is expected to eventually
649             yield the body value.
650              
651             As with the C parameter, the C field should be
652             specified explicitly in the request header, as should the content length
653             (typically via the L C method). See also
654             F.
655              
656             =item expect_continue => BOOL
657              
658             Optional. If true, sets the C request header to the value
659             C<100-continue> and does not send the C parameter until a
660             C<100 Continue> response is received from the server. If an error response is
661             received then the C code, if present, will not be invoked.
662              
663             =item on_ready => CODE
664              
665             Optional. A callback that is invoked once a socket connection is established
666             with the HTTP server, but before the request is actually sent over it. This
667             may be used by the client code to inspect the socket, or perform any other
668             operations on it. This code is expected to return a C; only once that
669             has completed will the request cycle continue. If it fails, that failure is
670             propagated to the caller.
671              
672             $f = $on_ready->( $connection )
673              
674             =item on_redirect => CODE
675              
676             Optional. A callback that is invoked if a redirect response is received,
677             before the new location is fetched. It will be passed the response and the new
678             URL.
679              
680             $on_redirect->( $response, $location )
681              
682             =item on_body_write => CODE
683              
684             Optional. A callback that is invoked after each successful C of the
685             body content. This may be used to implement an upload progress indicator or
686             similar. It will be passed the total number of bytes of body content written
687             so far (i.e. excluding bytes consumed in the header).
688              
689             $on_body_write->( $written )
690              
691             =item max_redirects => INT
692              
693             Optional. How many levels of redirection to follow. If not supplied, will
694             default to the value given in the constructor.
695              
696             =item timeout => NUM
697              
698             =item stall_timeout => NUM
699              
700             Optional. Overrides the object's configured timeout values for this one
701             request. If not specified, will use the configured defaults.
702              
703             On a timeout, the returned future will fail with either C or
704             C as the operation name.
705              
706             ( $message, "timeout" ) = $f->failure
707              
708             =back
709              
710             =head2 $http->do_request( %args )
711              
712             When not returning a future, the following extra arguments are used as
713             callbacks instead:
714              
715             =over 8
716              
717             =item on_response => CODE
718              
719             A callback that is invoked when a response to this request has been received.
720             It will be passed an L object containing the response the
721             server sent.
722              
723             $on_response->( $response )
724              
725             =item on_header => CODE
726              
727             Alternative to C. A callback that is invoked when the header of a
728             response has been received. It is expected to return a C reference for
729             handling chunks of body content. This C reference will be invoked with
730             no arguments once the end of the request has been reached, and whatever it
731             returns will be used as the result of the returned C, if there is one.
732              
733             $on_body_chunk = $on_header->( $header )
734              
735             $on_body_chunk->( $data )
736             $response = $on_body_chunk->()
737              
738             =item on_error => CODE
739              
740             A callback that is invoked if an error occurs while trying to send the request
741             or obtain the response. It will be passed an error message.
742              
743             $on_error->( $message )
744              
745             If this is invoked because of a received C<4xx> or C<5xx> error code in an
746             HTTP response, it will be invoked with the response and request objects as
747             well.
748              
749             $on_error->( $message, $response, $request )
750              
751             =back
752              
753             =cut
754              
755             sub _do_one_request
756             {
757 135     135   1714 my $self = shift;
758 135         720 my %args = @_;
759              
760 135         331 my $host = delete $args{host};
761 135         255 my $port = delete $args{port};
762 135         233 my $request = delete $args{request};
763 135         244 my $SSL = delete $args{SSL};
764              
765 135         488 my $start_time = time;
766 135   66     610 my $stall_timeout = $args{stall_timeout} // $self->{stall_timeout};
767              
768 135         489 $self->prepare_request( $request );
769              
770 135 50 33     1088 if( $self->{require_SSL} and not $SSL ) {
771 0         0 return Future->fail( "Non-SSL request is not allowed with 'require_SSL' set",
772             http => undef, $request );
773             }
774              
775 135 100       549 if( $metrics ) {
776 33         443 $metrics->inc_gauge( requests_in_flight => );
777 33         3205 $metrics->inc_counter( requests => [ method => $request->method ] );
778             }
779              
780             return $self->get_connection(
781             host => $args{proxy_host} || $self->{proxy_host} || $host,
782             port => $args{proxy_port} || $self->{proxy_port} || $port,
783             is_proxy => !!( $args{proxy_host} || $self->{proxy_host} ),
784             ( defined $args{family} ? ( family => $args{family} ) : () ),
785             $SSL ? (
786             SSL => 1,
787             SSL_hostname => $host,
788 0         0 %{ $self->{ssl_params} },
789 0 0       0 ( map { m/^SSL_/ ? ( $_ => $args{$_} ) : () } keys %args ),
790             ) : (),
791             )->then( sub {
792 127     127   8909 my ( $conn ) = @_;
793 127 100       517 $args{on_ready} ? $args{on_ready}->( $conn )->then_done( $conn )
794             : Future->done( $conn )
795             })->then( sub {
796 127     127   10774 my ( $conn ) = @_;
797              
798             return $conn->request(
799             request => $request,
800             stall_timeout => $stall_timeout,
801             %args,
802             $SSL ? ( SSL => 1 ) : (),
803             on_done => sub {
804 113         1298 my ( $ctx ) = @_;
805              
806 113 100       494 if( $metrics ) {
807 1         23 $metrics->dec_gauge( requests_in_flight => );
808             # TODO: Some sort of error counter instead for errors?
809 1         60 $metrics->inc_counter( responses => [ method => $request->method, code => $ctx->resp_header->code ] );
810 1         101 $metrics->report_timer( request_duration => time - $start_time );
811 1         71 $metrics->report_distribution( response_bytes => $ctx->resp_bytes );
812             }
813             },
814 127 50       1089 );
815 135 50 66     4621 } );
    50 66        
      66        
816             }
817              
818             sub _should_redirect
819             {
820 113     113   230 my ( $response ) = @_;
821              
822             # Should only redirect if we actually have a Location header
823 113 100 100     359 return 0 unless $response->is_redirect and defined $response->header( "Location" );
824              
825 9         473 my $req_method = $response->request->method;
826             # Should only redirect GET or HEAD requests
827 9   66     231 return $req_method eq "GET" || $req_method eq "HEAD";
828             }
829              
830             sub _do_request
831             {
832 128     128   239 my $self = shift;
833 128         482 my %args = @_;
834              
835 128         456 my $host = $args{host};
836 128         255 my $port = $args{port};
837 128         298 my $ssl = $args{SSL};
838              
839 128         247 my $on_header = delete $args{on_header};
840              
841 128 100       354 my $redirects = defined $args{max_redirects} ? $args{max_redirects} : $self->{max_redirects};
842              
843 128         239 my $request = $args{request};
844 128         219 my $response;
845             my $reqf;
846             # Defeat prototype
847             my $future = &repeat( $self->_capture_weakself( sub {
848 135     135   7219 my $self = shift;
849 135         289 my ( $previous_f ) = @_;
850              
851 135 100       386 if( $previous_f ) {
852 7         39 my $previous_response = $previous_f->get;
853 7         121 $args{previous_response} = $previous_response;
854              
855 7         124 my $location = $previous_response->header( "Location" );
856              
857 7 100       371 if( $location =~ m{^http(?:s?)://} ) {
    50          
858             # skip
859             }
860             elsif( $location =~ m{^/} ) {
861 3 50       88 my $hostport = ( $port != HTTP_PORT ) ? "$host:$port" : $host;
862 3         18 $location = "http://$hostport" . $location;
863             }
864             else {
865 0         0 return Future->fail( "Unrecognised Location: $location", http => $previous_response, $request );
866             }
867              
868 7         49 my $loc_uri = URI->new( $location );
869 7 50       7312 unless( $loc_uri ) {
870 0         0 return Future->fail( "Unable to parse '$location' as a URI", http => $previous_response, $request );
871             }
872              
873 7         94 $self->debug_printf( "REDIRECT $loc_uri" );
874              
875 7 100       118 $args{on_redirect}->( $previous_response, $location ) if $args{on_redirect};
876              
877 7         43 %args = $self->_make_request_for_uri( $loc_uri, %args );
878 7         22 $request = $args{request};
879              
880 7         13 undef $host; undef $port; undef $ssl;
  7         11  
  7         24  
881             }
882              
883 135         416 my $uri = $request->uri;
884 135 100 66     1166 if( defined $uri->scheme and $uri->scheme =~ m/^http(s?)$/ ) {
885 95 100       4224 $host = $uri->host if !defined $host;
886 95 100       577 $port = $uri->port if !defined $port;
887 95         625 $ssl = ( $uri->scheme eq "https" );
888             }
889              
890 135 50       2616 defined $host or croak "Expected 'host'";
891 135 50       328 defined $port or $port = ( $ssl ? HTTPS_PORT : HTTP_PORT );
    100          
892              
893             return $reqf = $self->_do_one_request(
894             host => $host,
895             port => $port,
896             SSL => $ssl,
897             %args,
898             on_header => $self->_capture_weakself( sub {
899 115         1088 my $self = shift;
900 115         276 ( $response ) = @_;
901              
902             # Consume and discard the entire body of a redirect
903             return sub {
904 9 50       27 return if @_;
905 9         53 return $response;
906 115 100 100     521 } if $redirects and $response->is_redirect;
907              
908 106         1165 return $on_header->( $response );
909 135         1024 } ),
910             );
911             } ),
912             while => sub {
913 128     128   12019 my $f = shift;
914 128 100 66     444 return 0 if $f->failure or $f->is_cancelled;
915 113   100     1601 return _should_redirect( $response ) && $redirects--;
916 128         1355 } );
917              
918 128 100       18276 if( $self->{fail_on_error} ) {
919             $future = $future->then_with_f( sub {
920 3     3   218 my ( $f, $resp ) = @_;
921 3         7 my $code = $resp->code;
922              
923 3 100       32 if( $code =~ m/^[45]/ ) {
924 2         4 my $message = $resp->message;
925 2         17 $message =~ s/\r$//; # HTTP::Message bug
926              
927 2         8 return Future->fail( "$code $message", http => $resp, $request );
928             }
929              
930 1         2 return $f;
931 3         13 });
932             }
933              
934 128         541 return $future;
935             }
936              
937             sub do_request
938             {
939 128     128 1 532092 my $self = shift;
940 128         615 my %args = @_;
941              
942 128 100       1326 if( my $uri = delete $args{uri} ) {
    50          
943 81         790 %args = $self->_make_request_for_uri( $uri, %args );
944             }
945             elsif( !defined $args{request} ) {
946 0         0 croak "Require either 'uri' or 'request' argument";
947             }
948              
949 128 100 66     697 if( $args{on_header} ) {
    50          
950             # ok
951             }
952             elsif( $args{on_response} or defined wantarray ) {
953             $args{on_header} = sub {
954 101     101   197 my ( $response ) = @_;
955             return sub {
956 172 100       455 if( @_ ) {
957 73         371 $response->add_content( @_ );
958             }
959             else {
960 99         273 return $response;
961             }
962 101         555 };
963             }
964 123         658 }
965             else {
966 0         0 croak "Expected 'on_response' or 'on_header' as CODE ref or to return a Future";
967             }
968              
969 128         340 my $on_error = delete $args{on_error};
970 128 100       382 my $timeout = defined $args{timeout} ? $args{timeout} : $self->{timeout};
971              
972 128         495 my $future = $self->_do_request( %args );
973              
974 128 100       393 if( defined $timeout ) {
975             $future = Future->wait_any(
976             $future,
977             $self->loop->timeout_future( after => $timeout )
978 33     4   77 ->transform( fail => sub { "Timed out", timeout => } ),
  4         491001  
979             );
980             }
981              
982             $future->on_done( $self->_capture_weakself( sub {
983 104     104   13623 my $self = shift;
984 104         171 my $response = shift;
985 104         309 $self->process_response( $response );
986 128         28464 } ) );
987              
988             $future->on_fail( sub {
989 8     8   1940 my ( $message, $name, @rest ) = @_;
990 8         40 $on_error->( $message, @rest );
991 128 100       4312 }) if $on_error;
992              
993 128 100       1831 if( my $on_response = delete $args{on_response} ) {
994             $future->on_done( sub {
995 72     72   1657 my ( $response ) = @_;
996 72         177 $on_response->( $response );
997 79         362 });
998             }
999              
1000             # DODGY HACK:
1001             # In void context we'll lose reference on the ->wait_any Future, so the
1002             # timeout logic will never happen. So lets purposely create a cycle by
1003             # capturing the $future in on_done/on_fail closures within itself. This
1004             # conveniently clears them out to drop the ref when done.
1005 128 100       1837 return $future if defined wantarray;
1006              
1007 53     53   218 $future->on_ready( sub { undef $future } );
  53         7542  
1008             }
1009              
1010             sub _make_request_for_uri
1011             {
1012 88     88   160 my $self = shift;
1013 88         258 my ( $uri, %args ) = @_;
1014              
1015 88 100 33     837 if( !ref $uri ) {
    50          
1016 14         146 $uri = URI->new( $uri );
1017             }
1018             elsif( blessed $uri and !$uri->isa( "URI" ) ) {
1019 0         0 croak "Expected 'uri' as a URI reference";
1020             }
1021              
1022 88   100     38919 my $method = delete $args{method} || "GET";
1023              
1024 88         386 $args{host} = $uri->host;
1025 88         4130 $args{port} = $uri->port;
1026              
1027 88         2477 my $request;
1028              
1029 88 100       317 if( $method eq "POST" ) {
1030 2 50       5 defined $args{content} or croak "Expected 'content' with POST method";
1031              
1032             # Lack of content_type didn't used to be a failure condition:
1033             ref $args{content} or defined $args{content_type} or
1034 2 50 66     9 carp "No 'content_type' was given with 'content'";
1035              
1036             # This will automatically encode a form for us
1037 2         7 $request = HTTP::Request::Common::POST( $uri, Content => $args{content}, Content_Type => $args{content_type} );
1038             }
1039             else {
1040 86         653 $request = HTTP::Request->new( $method, $uri );
1041 86 100       6384 if( defined $args{content} ) {
1042 2 50       7 defined $args{content_type} or carp "No 'content_type' was given with 'content'";
1043              
1044 2         6 $request->content( $args{content} );
1045 2   50     55 $request->content_type( $args{content_type} // "" );
1046             }
1047             }
1048              
1049 88         1113 $request->protocol( "HTTP/1.1" );
1050 88         940 $request->header( Host => $uri->host );
1051              
1052 88         7559 my $headers = $args{headers};
1053 88 100 100     527 if( $headers and reftype $headers eq "ARRAY" ) {
    100 66        
1054 1         12 $request->header( @$_ ) for pairs @$headers;
1055             }
1056             elsif( $headers and reftype $headers eq "HASH" ) {
1057 1         6 $request->header( $_, $headers->{$_} ) for keys %$headers;
1058             }
1059              
1060 88         288 my ( $user, $pass );
1061              
1062 88 100 66     308 if( defined $uri->userinfo ) {
    100          
1063 1         18 ( $user, $pass ) = split( m/:/, $uri->userinfo, 2 );
1064             }
1065             elsif( defined $args{user} and defined $args{pass} ) {
1066 1         18 $user = $args{user};
1067 1         2 $pass = $args{pass};
1068             }
1069              
1070 88 100 66     2156 if( defined $user and defined $pass ) {
1071 2         12 $request->authorization_basic( $user, $pass );
1072             }
1073              
1074 88         1282 $args{request} = $request;
1075              
1076 88         621 return %args;
1077             }
1078              
1079             =head2 $response = $http->GET( $uri, %args )->get
1080              
1081             =head2 $response = $http->HEAD( $uri, %args )->get
1082              
1083             =head2 $response = $http->PUT( $uri, $content, %args )->get
1084              
1085             =head2 $response = $http->POST( $uri, $content, %args )->get
1086              
1087             Convenient wrappers for performing C, C, C or C requests
1088             with a C object and few if any other arguments, returning a C.
1089              
1090             Remember that C with non-form data (as indicated by a plain scalar
1091             instead of an C reference of form data name/value pairs) needs a
1092             C key in C<%args>.
1093              
1094             =cut
1095              
1096             sub GET
1097             {
1098 8     8 1 2855 my $self = shift;
1099 8         23 my ( $uri, @args ) = @_;
1100 8         29 return $self->do_request( method => "GET", uri => $uri, @args );
1101             }
1102              
1103             sub HEAD
1104             {
1105 0     0 1 0 my $self = shift;
1106 0         0 my ( $uri, @args ) = @_;
1107 0         0 return $self->do_request( method => "HEAD", uri => $uri, @args );
1108             }
1109              
1110             sub PUT
1111             {
1112 0     0 1 0 my $self = shift;
1113 0         0 my ( $uri, $content, @args ) = @_;
1114 0         0 return $self->do_request( method => "PUT", uri => $uri, content => $content, @args );
1115             }
1116              
1117             sub POST
1118             {
1119 0     0 1 0 my $self = shift;
1120 0         0 my ( $uri, $content, @args ) = @_;
1121 0         0 return $self->do_request( method => "POST", uri => $uri, content => $content, @args );
1122             }
1123              
1124             =head1 SUBCLASS METHODS
1125              
1126             The following methods are intended as points for subclasses to override, to
1127             add extra functionallity.
1128              
1129             =cut
1130              
1131             =head2 $http->prepare_request( $request )
1132              
1133             Called just before the C object is sent to the server.
1134              
1135             =cut
1136              
1137             sub prepare_request
1138             {
1139 135     135 1 233 my $self = shift;
1140 135         254 my ( $request ) = @_;
1141              
1142 135 100       574 $request->init_header( 'User-Agent' => $self->{user_agent} ) if length $self->{user_agent};
1143 135 100       1960 if( $self->{close_after_request} ) {
1144 1         5 $request->header( "Connection" => "close" );
1145             }
1146             else {
1147 134         707 $request->init_header( "Connection" => "keep-alive" );
1148             }
1149              
1150 135         6211 foreach ( pairs @{ $self->{headers} } ) {
  135         1017  
1151 3         56 $request->init_header( $_->key, $_->value );
1152             }
1153              
1154 135 100       551 $self->{cookie_jar}->add_cookie_header( $request ) if $self->{cookie_jar};
1155             }
1156              
1157             =head2 $http->process_response( $response )
1158              
1159             Called after a non-redirect C has been received from a server.
1160             The originating request will be set in the object.
1161              
1162             =cut
1163              
1164             sub process_response
1165             {
1166 104     104 1 214 my $self = shift;
1167 104         185 my ( $response ) = @_;
1168              
1169 104 100       454 $self->{cookie_jar}->extract_cookies( $response ) if $self->{cookie_jar};
1170             }
1171              
1172             =head1 CONTENT DECODING
1173              
1174             If the required decompression modules are installed and available, compressed
1175             content can be decoded. If the received C is recognised and
1176             the required module is available, the content is transparently decoded and the
1177             decoded content is returned in the resulting response object, or passed to the
1178             data chunk handler. In this case, the original C header will
1179             be deleted from the response, and its value will be available instead as
1180             C.
1181              
1182             The following content encoding types are recognised by these modules:
1183              
1184             =over 4
1185              
1186             =cut
1187              
1188             =item * gzip (q=0.7) and deflate (q=0.5)
1189              
1190             Recognised if L version 2.057 or newer is installed.
1191              
1192             =cut
1193              
1194             if( eval { require Compress::Raw::Zlib and $Compress::Raw::Zlib::VERSION >= 2.057 } ) {
1195             my $make_zlib_decoder = sub {
1196             my ( $bits ) = @_;
1197             my $inflator = Compress::Raw::Zlib::Inflate->new(
1198             -ConsumeInput => 0,
1199             -WindowBits => $bits,
1200             );
1201             sub {
1202             my $output;
1203             my $status = @_ ? $inflator->inflate( $_[0], $output )
1204             : $inflator->inflate( "", $output, 1 );
1205             die "$status\n" if $status && $status != Compress::Raw::Zlib::Z_STREAM_END();
1206             return $output;
1207             };
1208             };
1209              
1210             # RFC1950
1211             __PACKAGE__->register_decoder(
1212             deflate => 0.5, sub { $make_zlib_decoder->( 15 ) },
1213             );
1214              
1215             # RFC1952
1216             __PACKAGE__->register_decoder(
1217             gzip => 0.7, sub { $make_zlib_decoder->( Compress::Raw::Zlib::WANT_GZIP() ) },
1218             );
1219             }
1220              
1221             =item * bzip2 (q=0.8)
1222              
1223             Recognised if L version 2.10 or newer is installed.
1224              
1225             =cut
1226              
1227             if( eval { require Compress::Bzip2 and $Compress::Bzip2::VERSION >= 2.10 } ) {
1228             __PACKAGE__->register_decoder(
1229             bzip2 => 0.8, sub {
1230             my $inflator = Compress::Bzip2::inflateInit();
1231             sub {
1232             return unless my ( $in ) = @_;
1233             my $out = $inflator->bzinflate( \$in );
1234             die $inflator->bzerror."\n" if !defined $out;
1235             return $out;
1236             };
1237             }
1238             );
1239             }
1240              
1241             =back
1242              
1243             Other content encoding types can be registered by calling the following method
1244              
1245             =head2 Net::Async::HTTP->register_decoder( $name, $q, $make_decoder )
1246              
1247             Registers an encoding type called C<$name>, at the quality value C<$q>. In
1248             order to decode this encoding type, C<$make_decoder> will be invoked with no
1249             paramters, and expected to return a CODE reference to perform one instance of
1250             decoding.
1251              
1252             $decoder = $make_decoder->()
1253              
1254             This decoder will be invoked on string buffers to decode them until
1255             the end of stream is reached, when it will be invoked with no arguments.
1256              
1257             $content = $decoder->( $encoded_content )
1258             $content = $decoder->() # EOS
1259              
1260             =cut
1261              
1262             {
1263             my %DECODERS; # {$name} = [$q, $make_decoder]
1264              
1265             sub register_decoder
1266             {
1267 74     74 1 140 shift;
1268 74         181 my ( $name, $q, $make_decoder ) = @_;
1269              
1270 74         219 $DECODERS{$name} = [ $q, $make_decoder ];
1271             }
1272              
1273             sub can_decode
1274             {
1275 2     2 0 4 shift;
1276 2 50       6 if( @_ ) {
1277 2         5 my ( $name ) = @_;
1278              
1279 2 50       8 return unless my $d = $DECODERS{$name};
1280 2         10 return $d->[1]->();
1281             }
1282             else {
1283 0           my @ds = sort { $DECODERS{$b}[0] <=> $DECODERS{$a}[0] } keys %DECODERS;
  0            
1284 0           return join ", ", map { "$_;q=$DECODERS{$_}[0]" } @ds;
  0            
1285             }
1286             }
1287             }
1288              
1289             =head1 EXAMPLES
1290              
1291             =head2 Concurrent GET
1292              
1293             The C-returning C method makes it easy to await multiple URLs at
1294             once, by using the L C utility
1295              
1296             use Future::Utils qw( fmap_void );
1297              
1298             my @URLs = ( ... );
1299              
1300             my $http = Net::Async::HTTP->new( ... );
1301             $loop->add( $http );
1302              
1303             my $future = fmap_void {
1304             my ( $url ) = @_;
1305             $http->GET( $url )
1306             ->on_done( sub {
1307             my $response = shift;
1308             say "$url succeeded: ", $response->code;
1309             say " Content-Type:", $response->content_type;
1310             } )
1311             ->on_fail( sub {
1312             my $failure = shift;
1313             say "$url failed: $failure";
1314             } );
1315             } foreach => \@URLs,
1316             concurrent => 5;
1317              
1318             $loop->await( $future );
1319              
1320             =cut
1321              
1322             =head1 SEE ALSO
1323              
1324             =over 4
1325              
1326             =item *
1327              
1328             L - Hypertext Transfer Protocol -- HTTP/1.1
1329              
1330             =back
1331              
1332             =head1 SPONSORS
1333              
1334             Parts of this code, or bugfixes to it were paid for by
1335              
1336             =over 2
1337              
1338             =item *
1339              
1340             SocialFlow L
1341              
1342             =item *
1343              
1344             Shadowcat Systems L
1345              
1346             =item *
1347              
1348             NET-A-PORTER L
1349              
1350             =item *
1351              
1352             Cisco L
1353              
1354             =back
1355              
1356             =head1 AUTHOR
1357              
1358             Paul Evans
1359              
1360             =cut
1361              
1362             0x55AA;