File Coverage

blib/lib/HTTP/Async.pm
Criterion Covered Total %
statement 309 336 91.9
branch 82 116 70.6
condition 48 80 60.0
subroutine 46 53 86.7
pod 27 27 100.0
total 512 612 83.6


line stmt bran cond sub pod time code
1 34     34   962162 use strict;
  34         52  
  34         819  
2 34     34   105 use warnings;
  34         36  
  34         1333  
3              
4             package HTTP::Async;
5              
6             our $VERSION = '0.31';
7              
8 34     34   122 use Carp;
  34         40  
  34         1953  
9 34     34   16239 use Data::Dumper;
  34         160821  
  34         2033  
10 34     34   13937 use HTTP::Response;
  34         257112  
  34         975  
11 34     34   17076 use IO::Select;
  34         40353  
  34         1562  
12 34     34   14794 use Net::HTTP::NB;
  34         1072547  
  34         329  
13 34     34   15159 use Net::HTTP;
  34         56  
  34         218  
14 34     34   22069 use URI;
  34         49  
  34         724  
15 34     34   15268 use Time::HiRes qw( time sleep );
  34         31924  
  34         130  
16              
17             =head1 NAME
18              
19             HTTP::Async - process multiple HTTP requests in parallel without blocking.
20              
21             =head1 SYNOPSIS
22              
23             Create an object and add some requests to it:
24              
25             use HTTP::Async;
26             my $async = HTTP::Async->new;
27            
28             # create some requests and add them to the queue.
29             $async->add( HTTP::Request->new( GET => 'http://www.perl.org/' ) );
30             $async->add( HTTP::Request->new( GET => 'http://www.ecclestoad.co.uk/' ) );
31              
32             and then EITHER process the responses as they come back:
33              
34             while ( my $response = $async->wait_for_next_response ) {
35             # Do some processing with $response
36             }
37            
38             OR do something else if there is no response ready:
39            
40             while ( $async->not_empty ) {
41             if ( my $response = $async->next_response ) {
42             # deal with $response
43             } else {
44             # do something else
45             }
46             }
47              
48             OR just use the async object to fetch stuff in the background and deal with
49             the responses at the end.
50              
51             # Do some long code...
52             for ( 1 .. 100 ) {
53             some_function();
54             $async->poke; # lets it check for incoming data.
55             }
56              
57             while ( my $response = $async->wait_for_next_response ) {
58             # Do some processing with $response
59             }
60              
61             =head1 DESCRIPTION
62              
63             Although using the conventional C is fast and easy it does
64             have some drawbacks - the code execution blocks until the request has been
65             completed and it is only possible to process one request at a time.
66             C attempts to address these limitations.
67              
68             It gives you a 'Async' object that you can add requests to, and then get the
69             requests off as they finish. The actual sending and receiving of the requests
70             is abstracted. As soon as you add a request it is transmitted, if there are
71             too many requests in progress at the moment they are queued. There is no
72             concept of starting or stopping - it runs continuously.
73              
74             Whilst it is waiting to receive data it returns control to the code that
75             called it meaning that you can carry out processing whilst fetching data from
76             the network. All without forking or threading - it is actually done using
77             C
78              
79             =head1 Default settings:
80              
81             There are a number of default settings that should be suitable for most uses.
82             However in some circumstances you might wish to change these.
83              
84             slots: 20
85             timeout: 180 (seconds)
86             max_request_time: 300 (seconds)
87             max_redirect: 7
88             poll_interval: 0.05 (seconds)
89             proxy_host: ''
90             proxy_port: ''
91             local_addr: ''
92             local_port: ''
93             ssl_options: {}
94             cookie_jar: undef
95             peer_addr: ''
96              
97             If defined, is expected to be similar to C, with extract_cookies and add_cookie_header methods.
98              
99             The option max_redirects has been renamed to max_redirect to be consistent with LWP::UserAgent, although max_redirects still works.
100              
101            
102             =head1 METHODS
103              
104             =head2 new
105              
106             my $async = HTTP::Async->new( %args );
107              
108             Creates a new HTTP::Async object and sets it up. Variations from the default
109             can be set by passing them in as C<%args>.
110              
111             =cut
112              
113             sub new {
114 30     30 1 391243 my $class = shift;
115 30         833 my $self = bless {
116              
117             opts => {
118             slots => 20,
119             max_redirect => 7,
120             timeout => 180,
121             max_request_time => 300,
122             poll_interval => 0.05,
123             cookie_jar => undef,
124             },
125              
126             id_opts => {},
127              
128             to_send => [],
129             in_progress => {},
130             to_return => [],
131              
132             current_id => 0,
133             fileno_to_id => {},
134             }, $class;
135              
136 30         200 $self->_init(@_);
137              
138 29         80 return $self;
139             }
140              
141             sub _init {
142 36     36   71 my $self = shift;
143 36         92 my %args = @_;
144 36         188 $self->_set_opt( $_ => $args{$_} ) for sort keys %args;
145 35         72 return $self;
146             }
147              
148 39     39   353 sub _next_id { return ++$_[0]->{current_id} }
149              
150             =head2 slots, timeout, max_request_time, poll_interval, max_redirect, proxy_host, proxy_port, local_addr, local_port, ssl_options, cookie_jar, peer_addr
151              
152             $old_value = $async->slots;
153             $new_value = $async->slots( $new_value );
154              
155             Get/setters for the C<$async> objects config settings. Timeout is for
156             inactivity and is in seconds.
157              
158             Slots is the maximum number of parallel requests to make.
159              
160             =cut
161              
162             my %GET_SET_KEYS = map { $_ => 1 } qw( slots poll_interval
163             timeout max_request_time max_redirect
164             proxy_host proxy_port local_addr local_port ssl_options cookie_jar peer_addr);
165              
166             sub _add_get_set_key {
167 6     6   6 my $class = shift;
168 6         7 my $key = shift;
169 6         10 $GET_SET_KEYS{$key} = 1;
170             }
171              
172             my %KEY_ALIASES = ( max_redirects => 'max_redirect' );
173              
174             sub _get_opt {
175 1757     1757   1868 my $self = shift;
176 1757         2585 my $key = shift;
177 1757         1572 my $id = shift;
178              
179 1757 50       3843 $key = $KEY_ALIASES{$key} if exists $KEY_ALIASES{$key};
180              
181 1757 50       3951 die "$key not valid for _get_opt" unless $GET_SET_KEYS{$key};
182              
183             # If there is an option set for this id then use that, otherwise fall back
184             # to the defaults.
185             return $self->{id_opts}{$id}{$key}
186 1757 100 100     5974 if $id && defined $self->{id_opts}{$id}{$key};
187              
188 1750         34891451 return $self->{opts}{$key};
189              
190             }
191              
192             sub _set_opt {
193 24     24   82 my $self = shift;
194 24         37 my $key = shift;
195              
196 24 100       73 $key = $KEY_ALIASES{$key} if exists $KEY_ALIASES{$key};
197              
198 24 100       79 die "$key not valid for _set_opt" unless $GET_SET_KEYS{$key};
199 23 50       109 $self->{opts}{$key} = shift if @_;
200 23         79 return $self->{opts}{$key};
201             }
202              
203             foreach my $key ( keys %GET_SET_KEYS ) {
204 1 50   1 1 101 eval "
  1 0   0 1 13  
  0 0   0 1 0  
  0 100   18 1 0  
  0 100   25 1 0  
  0 0   0 1 0  
  18 100   658 1 5469  
  18 0   0 1 81  
  25 0   0 1 41  
  25 50   39 1 137  
  0 0   0 1 0  
  0 100   4 1 0  
  658         1504  
  658         3830  
  0         0  
  0         0  
  0         0  
  0         0  
  39         71  
  39         225  
  0         0  
  0         0  
  4         3331  
  4         27  
205             sub $key {
206             my \$self = shift;
207             return scalar \@_
208             ? \$self->_set_opt( '$key', \@_ )
209             : \$self->_get_opt( '$key' );
210             }
211             ";
212             }
213              
214             =head2 add
215              
216             my @ids = $async->add(@requests);
217             my $first_id = $async->add(@requests);
218              
219             Adds requests to the queues. Each request is given an unique integer id (for
220             this C<$async>) that can be used to track the requests if needed. If called in
221             list context an array of ids is returned, in scalar context the id of the
222             first request added is returned.
223              
224             =cut
225              
226             sub add {
227 29     29 1 232683 my $self = shift;
228 29         67 my @returns = ();
229              
230 29         155 foreach my $req (@_) {
231 36         191 push @returns, $self->add_with_opts( $req, {} );
232             }
233              
234 29 50       221 return wantarray ? @returns : $returns[0];
235             }
236              
237             =head2 add_with_opts
238              
239             my $id = $async->add_with_opts( $request, \%opts );
240              
241             This method lets you add a single request to the queue with options that
242             differ from the defaults. For example you might wish to set a longer timeout
243             or to use a specific proxy. Returns the id of the request.
244              
245             The method croaks when passed an invalid option.
246              
247             =cut
248              
249             sub add_with_opts {
250 33     33 1 9208 my $self = shift;
251 33         45 my $req = shift;
252 33         39 my $opts = shift;
253              
254 33         47 for my $key (keys %{$opts}) {
  33         137  
255 6 100       147 croak "$key not valid for add_with_opts" unless $GET_SET_KEYS{$key};
256             }
257              
258 32         99 my $id = $self->_next_id;
259              
260 32         49 push @{ $$self{to_send} }, [ $req, $id ];
  32         105  
261 32         223 $self->{id_opts}{$id} = $opts;
262 32         76 $self->poke;
263              
264 32         109 return $id;
265             }
266              
267             =head2 poke
268              
269             $async->poke;
270              
271             At fairly frequent intervals some housekeeping needs to performed - such as
272             reading received data and starting new requests. Calling C lets the
273             object do this and then return quickly. Usually you will not need to use this
274             as most other methods do it for you.
275              
276             You should use C if your code is spending time elsewhere (ie not using
277             the async object) to allow it to keep the data flowing over the network. If it
278             is not used then the buffers may fill up and completed responses will not be
279             replaced with new requests.
280              
281             =cut
282              
283             sub poke {
284 10460     10460 1 12932 my $self = shift;
285              
286 10460         15909 $self->_process_in_progress;
287 10460         12664 $self->_process_to_send;
288              
289 10460         9237 return 1;
290             }
291              
292             =head2 next_response
293              
294             my $response = $async->next_response;
295             my ( $response, $id ) = $async->next_response;
296              
297             Returns the next response (as a L object) that is waiting, or
298             returns undef if there is none. In list context it returns a (response, id)
299             pair, or an empty list if none. Does not wait for a response so returns very
300             quickly.
301              
302             =cut
303              
304             sub next_response {
305 11     11 1 35 my $self = shift;
306 11         63 return $self->_next_response(0);
307             }
308              
309             =head2 wait_for_next_response
310              
311             my $response = $async->wait_for_next_response( 3.5 );
312             my ( $response, $id ) = $async->wait_for_next_response( 3.5 );
313              
314             As C but only returns if there is a next response or the time
315             in seconds passed in has elapsed. If no time is given then it blocks. Whilst
316             waiting it checks the queues every c seconds. The times can be
317             fractional seconds.
318              
319             =cut
320              
321             sub wait_for_next_response {
322 31     31 1 10588 my $self = shift;
323 31         44 my $wait_for = shift;
324              
325 31 100       773 $wait_for = $self->max_request_time
326             if !defined $wait_for;
327              
328 31         85 return $self->_next_response($wait_for);
329             }
330              
331             sub _next_response {
332 42     42   55 my $self = shift;
333 42   100     139 my $wait_for = shift || 0;
334 42         133 my $end_time = time + $wait_for;
335 42         54 my $resp_and_id = undef;
336              
337 42         141 while ( !$self->empty ) {
338 695         862 $resp_and_id = shift @{ $$self{to_return} };
  695         1865  
339              
340             # last if we have a response or we have run out of time.
341             last
342 695 100 100     4103 if $resp_and_id
343             || time > $end_time;
344              
345             # sleep for the default sleep time.
346             # warn "sleeping for " . $self->poll_interval;
347 655         36324 sleep $self->poll_interval;
348             }
349              
350             # If there is no result return false.
351 42 100       147 return unless $resp_and_id;
352              
353             # We have a response - delete the options for it from the store.
354 33         97 delete $self->{id_opts}{ $resp_and_id->[1] };
355              
356             # If we have a result return list or response depending on
357             # context.
358             return wantarray
359 33 50       196 ? @$resp_and_id
360             : $resp_and_id->[0];
361             }
362              
363             =head2 to_send_count
364              
365             my $pending = $async->to_send_count;
366              
367             Returns the number of items which have been added but have not yet started being processed.
368              
369             =cut
370              
371             sub to_send_count {
372 543     543 1 795 my $self = shift;
373 543         1923 $self->poke;
374 543         658 return scalar @{ $self->{to_send} };
  543         2318  
375             }
376              
377             =head2 to_return_count
378              
379             my $completed = $async->to_return_count;
380              
381             Returns the number of items which have completed transferring, and are waiting to be returned by next_response().
382              
383             =cut
384              
385             sub to_return_count {
386 4843     4843 1 3532 my $self = shift;
387 4843         5210 $self->poke;
388 4843         3007 return scalar @{ $self->{to_return} };
  4843         6162  
389             }
390              
391             =head2 in_progress_count
392              
393             my $running = $async->in_progress_count;
394              
395             Returns the number of items which are currently being processed asynchronously.
396              
397             =cut
398              
399             sub in_progress_count {
400 745     745 1 976 my $self = shift;
401 745         1669 $self->poke;
402 745         805 return scalar keys %{ $self->{in_progress} };
  745         2990  
403             }
404              
405             =head2 total_count
406              
407             my $total = $async->total_count;
408              
409             Returns the sum of the to_send_count, in_progress_count and to_return_count.
410              
411             This should be the total number of items which have been added that have not yet been returned by next_response().
412              
413             =cut
414              
415             sub total_count {
416 742     742 1 2866 my $self = shift;
417              
418 742         3710 my $count = 0 #
419             + $self->to_send_count #
420             + $self->in_progress_count #
421             + $self->to_return_count;
422              
423 742         3096 return $count;
424             }
425              
426             =head2 info
427              
428             print $async->info;
429              
430             Prints a line describing what the current state is.
431              
432             =cut
433              
434             sub info {
435 0     0 1 0 my $self = shift;
436              
437 0         0 return sprintf(
438             "HTTP::Async status: %4u,%4u,%4u (send, progress, return)\n",
439             $self->to_send_count, #
440             $self->in_progress_count, #
441             $self->to_return_count
442             );
443             }
444              
445             =head2 remove
446              
447             $async->remove($id);
448             my $success = $async->remove($id);
449              
450             Removes the item with the given id no matter which state it is currently in. Returns true if an item is removed, and false otherwise.
451              
452             =cut
453              
454             sub remove {
455 8     8 1 1240 my $self = shift;
456 8         10 my $id = shift;
457              
458 8         14 my $hashref = delete $self->{in_progress}{$id};
459 8 100       25 if (!$hashref) {
460 5         24 for my $list ('to_send', 'to_return') {
461 10         9 my ($r_and_id) = grep { $_->[1] eq $id } @{ $self->{$list} };
  3         9  
  10         21  
462 10         14 $hashref = $r_and_id->[0];
463 10 100       18 if ($hashref) {
464 3         10 @{ $self->{$list} }
465 3         5 = grep { $_->[1] ne $id } @{ $self->{$list} };
  3         7  
  3         6  
466             }
467             }
468             }
469 8 100       28 return if !$hashref;
470              
471 6         10 my $s = $hashref->{handle};
472 6         13 $self->_io_select->remove($s);
473 6         173 delete $self->{id_opts}{$id};
474              
475 6         250 return 1;
476             }
477              
478             =head2 remove_all
479              
480             $async->remove_all;
481             my $success = $async->remove_all;
482              
483             Removes all items no matter what states they are currently in. Returns true if any items are removed, and false otherwise.
484              
485             =cut
486              
487             sub remove_all {
488 2     2 1 4 my $self = shift;
489 2 100       8 return if $self->empty;
490              
491             my @ids = (
492 0         0 (map { $_->[1] } @{ $self->{to_send} }),
  1         4  
493 1         3 (keys %{ $self->{in_progress} }),
494 1         2 (map { $_->[1] } @{ $self->{to_return} }),
  1         7  
  1         6  
495             );
496              
497 1         9 for my $id (@ids) {
498 2         8 $self->remove($id);
499             }
500              
501 1         6 return 1;
502             }
503              
504             =head2 empty, not_empty
505              
506             while ( $async->not_empty ) { ...; }
507             while (1) { ...; last if $async->empty; }
508              
509             Returns true or false depending on whether there are request or responses
510             still on the object.
511              
512             =cut
513              
514             sub empty {
515 702     702 1 2824 my $self = shift;
516 702 100       2965 return $self->total_count ? 0 : 1;
517             }
518              
519             sub not_empty {
520 3     3 1 2369 my $self = shift;
521 3         7 return !$self->empty;
522             }
523              
524             =head2 DESTROY
525              
526             The destroy method croaks if an object is destroyed but is not empty. This is
527             to help with debugging.
528              
529             =cut
530              
531             sub DESTROY {
532 30     30   9376807 my $self = shift;
533 30         66 my $class = ref $self;
534              
535 30 50       98 carp "$class object destroyed but still in use"
536             if $self->total_count;
537              
538             carp "$class INTERNAL ERROR: 'id_opts' not empty"
539 30 50       40 if scalar keys %{ $self->{id_opts} };
  30         112  
540              
541 30         1103 return;
542             }
543              
544             # Go through all the values on the select list and check to see if
545             # they have been fully received yet.
546              
547             sub _process_in_progress {
548 10460     10460   7079 my $self = shift;
549 10460         10242 my %seen_ids = ();
550              
551             HANDLE:
552 10460         12503 foreach my $s ( $self->_io_select->can_read(0) ) {
553              
554             # Get the id and add it to the hash of seen ids so we don't check it
555             # later for errors.
556 234   50     7329 my $id = $self->{fileno_to_id}{ $s->fileno }
557             || die "INTERNAL ERROR: could not got id for fileno";
558 234         2058 $seen_ids{$id}++;
559              
560 234         361 my $hashref = $$self{in_progress}{$id};
561 234   100     746 my $tmp = $hashref->{tmp} ||= {};
562              
563             # warn Dumper $hashref;
564              
565             # Check that we have not timed-out.
566 234 100 66     1685 if ( time > $hashref->{timeout_at}
567             || time > $hashref->{finish_by} )
568             {
569              
570             # warn sprintf "Timeout: %.3f > %.3f", #
571             # time, $hashref->{timeout_at};
572              
573             $self->_add_error_response_to_return(
574             id => $id,
575             code => 504,
576             request => $hashref->{request},
577             previous => $hashref->{previous},
578 1         11 content => 'Timed out',
579             );
580              
581 1         7 $self->_io_select->remove($s);
582 1         35 delete $$self{fileno_to_id}{ $s->fileno };
583 1         11 next HANDLE;
584             }
585              
586             # If there is a code then read the body.
587 233 100       621 if ( $$tmp{code} ) {
588 169         261 my $buf;
589 169         634 my $n = $s->read_entity_body( $buf, 1024 * 16 ); # 16kB
590 169 100       10458 $$tmp{is_complete} = 1 unless $n;
591 169         423 $$tmp{content} .= $buf;
592              
593             # warn "Received " . length( $buf ) ;
594              
595             # warn $buf;
596             }
597              
598             # If no code try to read the headers.
599             else {
600 64         418 $s->flush;
601              
602 64         95 my ( $code, $message, %headers );
603              
604 64         104 eval {
605 64         304 ( $code, $message, %headers ) =
606             $s->read_response_headers( laxed => 1, junk_out => [] );
607             };
608              
609 64 100       14747 if ($@) {
610             $self->_add_error_response_to_return(
611             'code' => 504,
612             'content' => $@,
613             'id' => $id,
614             'request' => $hashref->{request},
615             'previous' => $hashref->{previous}
616 1         14 );
617 1         4 $self->_io_select->remove($s);
618 1         52 delete $$self{fileno_to_id}{ $s->fileno };
619 1         11 next HANDLE;
620             }
621              
622 63 50       177 if ($code) {
623              
624             # warn "Got headers: $code $message " . time;
625              
626 63         163 $$tmp{code} = $code;
627 63         195 $$tmp{message} = $message;
628 63         180 my @headers_array = map { $_, $headers{$_} } keys %headers;
  239         429  
629 63         242 $$tmp{headers} = \@headers_array;
630              
631             }
632             }
633              
634             # Reset the timeout.
635 232         781 $hashref->{timeout_at} = time + $self->_get_opt( 'timeout', $id );
636             # warn "recieved - timeout set to '$hashref->{timeout_at}'";
637              
638             # If the message is complete then create a request and add it
639             # to 'to_return';
640 232 100       625 if ( $$tmp{is_complete} ) {
641 59         140 delete $$self{fileno_to_id}{ $s->fileno };
642 59         319 $self->_io_select->remove($s);
643              
644             # warn Dumper $$hashref{content};
645              
646             my $response = HTTP::Response->new(
647 59         2449 @$tmp{ 'code', 'message', 'headers', 'content' } );
648              
649 59         9084 $response->request( $hashref->{request} );
650 59 100       466 $response->previous( $hashref->{previous} ) if $hashref->{previous};
651              
652             # Deal with cookies
653 59         235 my $jar = $self->_get_opt('cookie_jar', $id);
654 59 100       113 if ($jar) {
655 1         9 $jar->extract_cookies($response);
656             }
657              
658             # If it was a redirect and there are still redirects left
659             # create a new request and unshift it onto the 'to_send'
660             # array.
661             # Only redirect GET and HEAD as per RFC 2616.
662             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
663 59         505 my $code = $response->code;
664 59         664 my $get_or_head = $response->request->method =~ m{^(?:GET|HEAD)$};
665              
666 59 100 100     1012 if (
      33        
      66        
      66        
      66        
667             $response->is_redirect # is a redirect
668             && $hashref->{redirects_left} > 0 # and we still want to follow
669             && ($get_or_head || $code !~ m{^30[127]$}) # must be GET or HEAD if it's 301, 302 or 307
670             && $code != 304 # not a 'not modified' reponse
671             && $code != 305 # not a 'use proxy' response
672             )
673             {
674              
675 30         421 $hashref->{redirects_left}--;
676              
677 30         77 my $loc = $response->header('Location');
678 30         864 my $uri = $response->request->uri;
679              
680 30 50 33     324 warn "Problem: " . Dumper( { loc => $loc, uri => $uri } )
      33        
      33        
681             unless $uri && ref $uri && $loc && !ref $loc;
682              
683 30         354 my $url = _make_url_absolute( url => $loc, ref => $uri );
684              
685 30         4902 my $request = $response->request->clone;
686 30         3083 $request->uri($url);
687              
688             # These headers should never be forwarded
689 30         1484 $request->remove_header('Host', 'Cookie');
690              
691             # Don't leak private information.
692             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec15.html#sec15.1.3
693 30 0 33     635 if ($request->header('Referer') &&
      33        
694             $hashref->{request}->uri->scheme eq 'https' &&
695             $request->uri->scheme eq 'http') {
696              
697 0         0 $request->remove_header('Referer');
698             }
699              
700             # See Other should use GET
701 30 50 33     731 if ($code == 303 && !$get_or_head) {
702 0         0 $request->method('GET');
703 0         0 $request->content('');
704 0         0 $request->remove_content_headers;
705             }
706              
707 30         81 $self->_send_request( [ $request, $id ] );
708 30         66 $hashref->{previous} = $response;
709             }
710             else {
711 29         494 $self->_add_to_return_queue( [ $response, $id ] );
712 29         79 delete $$self{in_progress}{$id};
713             }
714              
715 59         320 delete $hashref->{tmp};
716             }
717             }
718              
719             # warn Dumper(
720             # {
721             # in_progress => $self->{in_progress},
722             # seen_ids => \%seen_ids,
723             # }
724             # );
725              
726 10460         59048 foreach my $id ( keys %{ $self->{in_progress} } ) {
  10460         16958  
727              
728             # skip this one if it was processed above.
729 9681 100       13122 next if $seen_ids{$id};
730              
731 9478         7324 my $hashref = $self->{in_progress}{$id};
732              
733 9478 100 100     38871 if ( time > $hashref->{timeout_at}
734             || time > $hashref->{finish_by} )
735             {
736              
737             # warn Dumper( { hashref => $hashref, now => time } );
738              
739             # we have a request that has timed out - handle it
740             $self->_add_error_response_to_return(
741             id => $id,
742             code => 504,
743             request => $hashref->{request},
744             previous => $hashref->{previous},
745 3         44 content => 'Timed out',
746             );
747              
748 3         9 my $s = $hashref->{handle};
749 3         10 $self->_io_select->remove($s);
750 3         191 delete $$self{fileno_to_id}{ $s->fileno };
751             }
752             }
753              
754 10460         10822 return 1;
755             }
756              
757             sub _add_to_return_queue {
758 36     36   58 my $self = shift;
759 36         43 my $req_and_id = shift;
760 36         48 push @{ $$self{to_return} }, $req_and_id;
  36         93  
761 36         97 return 1;
762             }
763              
764             # Add all the items waiting to be sent to 'to_send' up to the 'slots'
765             # limit.
766              
767             sub _process_to_send {
768 10460     10460   7445 my $self = shift;
769              
770 10460   66     7111 while ( scalar @{ $$self{to_send} }
  10499         20851  
771 39         178 && $self->slots > scalar keys %{ $$self{in_progress} } )
772             {
773 39         57 $self->_send_request( shift @{ $$self{to_send} } );
  39         164  
774             }
775              
776 10460         8413 return 1;
777             }
778              
779             sub _send_request {
780 69     69   75 my $self = shift;
781 69         68 my $r_and_id = shift;
782 69         102 my ( $request, $id ) = @$r_and_id;
783              
784 69         184 my $uri = URI->new( $request->uri );
785              
786 69         4516 my %args = ();
787              
788             # Get cookies from jar if one exists
789 69         149 my $jar = $self->_get_opt('cookie_jar', $id);
790 69 100       144 if ($jar) {
791 1         6 $jar->add_cookie_header($request);
792             }
793              
794             # We need to use a different request_uri for proxied requests. Decide to use
795             # this if a proxy port or host is set.
796             #
797             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
798 69         602 $args{Host} = $uri->host;
799 69         2130 $args{PeerAddr} = $self->_get_opt( 'proxy_host', $id );
800 69         185 $args{PeerPort} = $self->_get_opt( 'proxy_port', $id );
801 69         130 $args{LocalAddr} = $self->_get_opt('local_addr', $id );
802 69         134 $args{LocalPort} = $self->_get_opt('local_port', $id );
803              
804             # https://rt.cpan.org/Public/Bug/Display.html?id=33071
805 69         139 $args{Timeout} = $self->_get_opt( 'timeout', $id);
806              
807             # ACF - Pass ssl_options through
808 69         129 $args{ssl_opts} = $self->_get_opt( 'ssl_options', $id);
809              
810             my $request_is_to_proxy =
811             ( $args{PeerAddr} || $args{PeerPort} ) # if either are set...
812 69 100 66     304 ? 1 # ...then we are a proxy request
813             : 0; # ...otherwise not
814              
815             # If we did not get a setting from the proxy then use the uri values.
816              
817 69   66     293 $args{PeerAddr} ||= $uri->host;
818 69         1206 my $peer_address = $self->_get_opt('peer_addr', $id );
819 69 100       144 if($peer_address) {
820 1         1 $args{PeerAddr} = $peer_address;
821             }
822              
823 69   66     304 $args{PeerPort} ||= $uri->port;
824              
825 69         1253 my $net_http_class = 'Net::HTTP::NB';
826 69 50 33     240 if ($uri->scheme and $uri->scheme eq 'https' and not $request_is_to_proxy) {
    50 33        
      33        
      33        
827 0         0 $net_http_class = 'Net::HTTPS::NB';
828 0         0 eval {
829 0         0 require Net::HTTPS::NB;
830 0         0 Net::HTTPS::NB->import();
831             };
832 0 0       0 die "$net_http_class must be installed for https support" if $@;
833              
834             # Add SSL options, if any, to args
835 0         0 my $ssl_options = $self->_get_opt('ssl_options');
836 0 0       0 @args{ keys %$ssl_options } = values %$ssl_options if $ssl_options;
837             }
838             elsif($uri->scheme and $uri->scheme eq 'https' and $request_is_to_proxy) {
839             # We are making an HTTPS request through an HTTP proxy such as squid.
840             # The proxy will handle the HTTPS, we need to connect to it via HTTP
841             # and then make a request where the https is clear from the scheme...
842             $args{Host} = sprintf(
843             '%s:%s',
844 0         0 delete @args{'PeerAddr', 'PeerPort'}
845             );
846             }
847 69         3256 my $s = eval { $net_http_class->new(%args) };
  69         784  
848              
849             # We could not create a request - fake up a 503 response with
850             # error as content.
851 69 100       195379 if ( !$s ) {
852              
853             $self->_add_error_response_to_return(
854             id => $id,
855             code => 503,
856             request => $request,
857             previous => $$self{in_progress}{$id}{previous},
858 2         20 content => $@,
859             );
860              
861 2         23 return 1;
862             }
863              
864 67         75 my %headers;
865 67         364 for my $key ($request->{_headers}->header_field_names) {
866 1         28 $headers{$key} = $request->header($key);
867             }
868              
869             # Decide what to use as the request_uri
870 67 100       986 my $request_uri = $request_is_to_proxy # is this a proxy request....
871             ? $uri->as_string # ... if so use full url
872             : _strip_host_from_uri($uri); # ...else strip off scheme, host and port
873              
874 67 50       221 croak "Could not write request to $uri '$!'"
875             unless $s->write_request( $request->method, $request_uri, %headers,
876             $request->content );
877              
878 67         11618 $self->_io_select->add($s);
879              
880 67         2082 my $time = time;
881 67   100     335 my $entry = $$self{in_progress}{$id} ||= {};
882              
883 67         249 $$self{fileno_to_id}{ $s->fileno } = $id;
884              
885 67         437 $entry->{request} = $request;
886 67         99 $entry->{started_at} = $time;
887              
888            
889 67         129 $entry->{timeout_at} = $time + $self->_get_opt( 'timeout', $id );
890             # warn "sent - timeout set to '$entry->{timeout_at}'";
891              
892 67         126 $entry->{finish_by} = $time + $self->_get_opt( 'max_request_time', $id );
893 67         151 $entry->{handle} = $s;
894              
895             $entry->{redirects_left} = $self->_get_opt( 'max_redirect', $id )
896 67 100       193 unless exists $entry->{redirects_left};
897              
898 67         272 return 1;
899             }
900              
901             sub _strip_host_from_uri {
902 69     69   8559 my $uri = shift;
903              
904 69         210 my $scheme_and_auth = quotemeta( $uri->scheme . '://' . $uri->authority );
905 69         1730 my $url = $uri->as_string;
906              
907 69         769 $url =~ s/^$scheme_and_auth//;
908 69 100       290 $url = "/$url" unless $url =~ m{^/};
909              
910 69         182 return $url;
911             }
912              
913             sub _io_select {
914 10597     10597   7262 my $self = shift;
915 10597   66     33432 return $$self{io_select} ||= IO::Select->new();
916             }
917              
918             sub _make_url_absolute {
919 34     34   7423 my %args = @_;
920              
921 34         46 my $in = $args{url};
922 34         30 my $ref = $args{ref};
923              
924 34         126 return URI->new_abs($in, $ref)->as_string;
925             }
926              
927             sub _add_error_response_to_return {
928 7     7   19 my $self = shift;
929 7         91 my %args = @_;
930              
931 34     34   85977 use HTTP::Status;
  34         46  
  34         10959  
932              
933             my $response =
934             HTTP::Response->new( $args{code}, status_message( $args{code} ),
935 7         70 undef, $args{content} );
936              
937 7         743 $response->request( $args{request} );
938 7 50       83 $response->previous( $args{previous} ) if $args{previous};
939              
940 7         100 $self->_add_to_return_queue( [ $response, $args{id} ] );
941 7         21 delete $$self{in_progress}{ $args{id} };
942              
943 7         25 return $response;
944              
945             }
946              
947             =head1 SEE ALSO
948              
949             L - a polite form of this module. Slows the scraping down
950             by domain so that the remote server is not overloaded.
951              
952             =head1 GOTCHAS
953              
954             The responses may not come back in the same order as the requests were made.
955             For https requests to work, you must have L installed.
956              
957             =head1 THANKS
958              
959             Egor Egorov contributed patches for proxies, catching connections that die
960             before headers sent and more.
961              
962             Tomohiro Ikebe from livedoor.jp submitted patches (and a test) to properly
963             handle 304 responses.
964              
965             Naveed Massjouni for adding the https handling code.
966              
967             Alex Balhatchet for adding the https + proxy handling code, and for making the
968             tests run ok in parallel.
969              
970             Josef Toman for fixing two bugs, one related to header handling and another
971             related to producing an absolute URL correctly.
972              
973             Github user 'c00ler-' for adding LocalAddr and LocalPort support.
974              
975             rt.cpan.org user 'Florian (fschlich)' for typo in documentation.
976              
977             Heikki Vatiainen for the ssl_options support patch.
978              
979             Daniel Lintott of the Debian Perl Group for pointing out a test failure when
980             using a very recent version of HTTP::Server::Simple to implement
981             t/TestServer.pm
982              
983             =head1 BUGS AND REPO
984              
985             Please submit all bugs, patches etc on github
986              
987             L
988              
989             =head1 AUTHOR
990              
991             Edmund von der Burg C<< >>.
992              
993             L
994              
995             =head1 LICENCE AND COPYRIGHT
996              
997             Copyright (c) 2006, Edmund von der Burg C<< >>.
998             All rights reserved.
999              
1000             This module is free software; you can redistribute it and/or modify it under
1001             the same terms as Perl itself.
1002              
1003             =head1 DISCLAIMER OF WARRANTY
1004              
1005             BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE
1006             SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE
1007             STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE
1008             SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED,
1009             INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
1010             FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND
1011             PERFORMANCE OF THE SOFTWARE IS WITH YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE,
1012             YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR, OR CORRECTION.
1013              
1014             IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY
1015             COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE
1016             SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE LIABLE TO YOU FOR DAMAGES,
1017             INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
1018             OUT OF THE USE OR INABILITY TO USE THE SOFTWARE (INCLUDING BUT NOT LIMITED TO
1019             LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
1020             THIRD PARTIES OR A FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER
1021             SOFTWARE), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
1022             POSSIBILITY OF SUCH DAMAGES.
1023              
1024             =cut
1025              
1026             1;
1027