File Coverage

blib/lib/HTTP/Async.pm
Criterion Covered Total %
statement 309 336 91.9
branch 82 116 70.6
condition 48 80 60.0
subroutine 46 53 86.7
pod 27 27 100.0
total 512 612 83.6


line stmt bran cond sub pod time code
1 34     34   941248 use strict;
  34         47  
  34         767  
2 34     34   103 use warnings;
  34         33  
  34         1249  
3              
4             package HTTP::Async;
5              
6             our $VERSION = '0.32';
7              
8 34     34   122 use Carp;
  34         40  
  34         1611  
9 34     34   14813 use Data::Dumper;
  34         145423  
  34         1558  
10 34     34   13205 use HTTP::Response;
  34         226894  
  34         784  
11 34     34   13826 use IO::Select;
  34         36339  
  34         1319  
12 34     34   12840 use Net::HTTP::NB;
  34         1001419  
  34         278  
13 34     34   13043 use Net::HTTP;
  34         52  
  34         202  
14 34     34   20469 use URI;
  34         53  
  34         631  
15 34     34   14074 use Time::HiRes qw( time sleep );
  34         29129  
  34         121  
16              
17             =head1 NAME
18              
19             HTTP::Async - process multiple HTTP requests in parallel without blocking.
20              
21             =head1 SYNOPSIS
22              
23             Create an object and add some requests to it:
24              
25             use HTTP::Async;
26             my $async = HTTP::Async->new;
27            
28             # create some requests and add them to the queue.
29             $async->add( HTTP::Request->new( GET => 'http://www.perl.org/' ) );
30             $async->add( HTTP::Request->new( GET => 'http://www.ecclestoad.co.uk/' ) );
31              
32             and then EITHER process the responses as they come back:
33              
34             while ( my $response = $async->wait_for_next_response ) {
35             # Do some processing with $response
36             }
37            
38             OR do something else if there is no response ready:
39            
40             while ( $async->not_empty ) {
41             if ( my $response = $async->next_response ) {
42             # deal with $response
43             } else {
44             # do something else
45             }
46             }
47              
48             OR just use the async object to fetch stuff in the background and deal with
49             the responses at the end.
50              
51             # Do some long code...
52             for ( 1 .. 100 ) {
53             some_function();
54             $async->poke; # lets it check for incoming data.
55             }
56              
57             while ( my $response = $async->wait_for_next_response ) {
58             # Do some processing with $response
59             }
60              
61             =head1 DESCRIPTION
62              
63             Although using the conventional C is fast and easy it does
64             have some drawbacks - the code execution blocks until the request has been
65             completed and it is only possible to process one request at a time.
66             C attempts to address these limitations.
67              
68             It gives you a 'Async' object that you can add requests to, and then get the
69             requests off as they finish. The actual sending and receiving of the requests
70             is abstracted. As soon as you add a request it is transmitted, if there are
71             too many requests in progress at the moment they are queued. There is no
72             concept of starting or stopping - it runs continuously.
73              
74             Whilst it is waiting to receive data it returns control to the code that
75             called it meaning that you can carry out processing whilst fetching data from
76             the network. All without forking or threading - it is actually done using
77             C
78              
79             =head1 Default settings:
80              
81             There are a number of default settings that should be suitable for most uses.
82             However in some circumstances you might wish to change these.
83              
84             slots: 20
85             timeout: 180 (seconds)
86             max_request_time: 300 (seconds)
87             max_redirect: 7
88             poll_interval: 0.05 (seconds)
89             proxy_host: ''
90             proxy_port: ''
91             local_addr: ''
92             local_port: ''
93             ssl_options: {}
94             cookie_jar: undef
95             peer_addr: ''
96              
97             If defined, is expected to be similar to C, with extract_cookies and add_cookie_header methods.
98              
99             The option max_redirects has been renamed to max_redirect to be consistent with LWP::UserAgent, although max_redirects still works.
100              
101            
102             =head1 METHODS
103              
104             =head2 new
105              
106             my $async = HTTP::Async->new( %args );
107              
108             Creates a new HTTP::Async object and sets it up. Variations from the default
109             can be set by passing them in as C<%args>.
110              
111             =cut
112              
113             sub new {
114 30     30 1 374356 my $class = shift;
115 30         830 my $self = bless {
116              
117             opts => {
118             slots => 20,
119             max_redirect => 7,
120             timeout => 180,
121             max_request_time => 300,
122             poll_interval => 0.05,
123             cookie_jar => undef,
124             },
125              
126             id_opts => {},
127              
128             to_send => [],
129             in_progress => {},
130             to_return => [],
131              
132             current_id => 0,
133             fileno_to_id => {},
134             }, $class;
135              
136 30         187 $self->_init(@_);
137              
138 29         87 return $self;
139             }
140              
141             sub _init {
142 36     36   92 my $self = shift;
143 36         104 my %args = @_;
144 36         175 $self->_set_opt( $_ => $args{$_} ) for sort keys %args;
145 35         73 return $self;
146             }
147              
148 39     39   315 sub _next_id { return ++$_[0]->{current_id} }
149              
150             =head2 slots, timeout, max_request_time, poll_interval, max_redirect, proxy_host, proxy_port, local_addr, local_port, ssl_options, cookie_jar, peer_addr
151              
152             $old_value = $async->slots;
153             $new_value = $async->slots( $new_value );
154              
155             Get/setters for the C<$async> objects config settings. Timeout is for
156             inactivity and is in seconds.
157              
158             Slots is the maximum number of parallel requests to make.
159              
160             =cut
161              
162             my %GET_SET_KEYS = map { $_ => 1 } qw( slots poll_interval
163             timeout max_request_time max_redirect
164             proxy_host proxy_port local_addr local_port ssl_options cookie_jar peer_addr);
165              
166             sub _add_get_set_key {
167 6     6   8 my $class = shift;
168 6         6 my $key = shift;
169 6         12 $GET_SET_KEYS{$key} = 1;
170             }
171              
172             my %KEY_ALIASES = ( max_redirects => 'max_redirect' );
173              
174             sub _get_opt {
175 1756     1756   1656 my $self = shift;
176 1756         1981 my $key = shift;
177 1756         1461 my $id = shift;
178              
179 1756 50       3258 $key = $KEY_ALIASES{$key} if exists $KEY_ALIASES{$key};
180              
181 1756 50       3353 die "$key not valid for _get_opt" unless $GET_SET_KEYS{$key};
182              
183             # If there is an option set for this id then use that, otherwise fall back
184             # to the defaults.
185             return $self->{id_opts}{$id}{$key}
186 1756 100 100     4958 if $id && defined $self->{id_opts}{$id}{$key};
187              
188 1749         34941886 return $self->{opts}{$key};
189              
190             }
191              
192             sub _set_opt {
193 24     24   68 my $self = shift;
194 24         35 my $key = shift;
195              
196 24 100       77 $key = $KEY_ALIASES{$key} if exists $KEY_ALIASES{$key};
197              
198 24 100       76 die "$key not valid for _set_opt" unless $GET_SET_KEYS{$key};
199 23 50       115 $self->{opts}{$key} = shift if @_;
200 23         81 return $self->{opts}{$key};
201             }
202              
203             foreach my $key ( keys %GET_SET_KEYS ) {
204 1 50   1 1 128 eval "
  1 0   0 1 17  
  0 0   0 1 0  
  0 100   18 1 0  
  0 100   25 1 0  
  0 0   0 1 0  
  18 100   659 1 4147  
  18 0   0 1 89  
  25 0   0 1 39  
  25 50   39 1 117  
  0 0   0 1 0  
  0 100   4 1 0  
  659         1217  
  659         3021  
  0         0  
  0         0  
  0         0  
  0         0  
  39         66  
  39         215  
  0         0  
  0         0  
  4         3182  
  4         56  
205             sub $key {
206             my \$self = shift;
207             return scalar \@_
208             ? \$self->_set_opt( '$key', \@_ )
209             : \$self->_get_opt( '$key' );
210             }
211             ";
212             }
213              
214             =head2 add
215              
216             my @ids = $async->add(@requests);
217             my $first_id = $async->add(@requests);
218              
219             Adds requests to the queues. Each request is given an unique integer id (for
220             this C<$async>) that can be used to track the requests if needed. If called in
221             list context an array of ids is returned, in scalar context the id of the
222             first request added is returned.
223              
224             =cut
225              
226             sub add {
227 29     29 1 162283 my $self = shift;
228 29         65 my @returns = ();
229              
230 29         164 foreach my $req (@_) {
231 36         151 push @returns, $self->add_with_opts( $req, {} );
232             }
233              
234 29 50       244 return wantarray ? @returns : $returns[0];
235             }
236              
237             =head2 add_with_opts
238              
239             my $id = $async->add_with_opts( $request, \%opts );
240              
241             This method lets you add a single request to the queue with options that
242             differ from the defaults. For example you might wish to set a longer timeout
243             or to use a specific proxy. Returns the id of the request.
244              
245             The method croaks when passed an invalid option.
246              
247             =cut
248              
249             sub add_with_opts {
250 33     33 1 8547 my $self = shift;
251 33         48 my $req = shift;
252 33         36 my $opts = shift;
253              
254 33         44 for my $key (keys %{$opts}) {
  33         134  
255 6 100       164 croak "$key not valid for add_with_opts" unless $GET_SET_KEYS{$key};
256             }
257              
258 32         100 my $id = $self->_next_id;
259              
260 32         82 push @{ $$self{to_send} }, [ $req, $id ];
  32         90  
261 32         232 $self->{id_opts}{$id} = $opts;
262 32         72 $self->poke;
263              
264 32         99 return $id;
265             }
266              
267             =head2 poke
268              
269             $async->poke;
270              
271             At fairly frequent intervals some housekeeping needs to performed - such as
272             reading received data and starting new requests. Calling C lets the
273             object do this and then return quickly. Usually you will not need to use this
274             as most other methods do it for you.
275              
276             You should use C if your code is spending time elsewhere (ie not using
277             the async object) to allow it to keep the data flowing over the network. If it
278             is not used then the buffers may fill up and completed responses will not be
279             replaced with new requests.
280              
281             =cut
282              
283             sub poke {
284 8329     8329 1 10104 my $self = shift;
285              
286 8329         9099 $self->_process_in_progress;
287 8329         9969 $self->_process_to_send;
288              
289 8329         7268 return 1;
290             }
291              
292             =head2 next_response
293              
294             my $response = $async->next_response;
295             my ( $response, $id ) = $async->next_response;
296              
297             Returns the next response (as a L object) that is waiting, or
298             returns undef if there is none. In list context it returns a (response, id)
299             pair, or an empty list if none. Does not wait for a response so returns very
300             quickly.
301              
302             =cut
303              
304             sub next_response {
305 11     11 1 13 my $self = shift;
306 11         63 return $self->_next_response(0);
307             }
308              
309             =head2 wait_for_next_response
310              
311             my $response = $async->wait_for_next_response( 3.5 );
312             my ( $response, $id ) = $async->wait_for_next_response( 3.5 );
313              
314             As C but only returns if there is a next response or the time
315             in seconds passed in has elapsed. If no time is given then it blocks. Whilst
316             waiting it checks the queues every c seconds. The times can be
317             fractional seconds.
318              
319             =cut
320              
321             sub wait_for_next_response {
322 31     31 1 9383 my $self = shift;
323 31         41 my $wait_for = shift;
324              
325 31 100       741 $wait_for = $self->max_request_time
326             if !defined $wait_for;
327              
328 31         72 return $self->_next_response($wait_for);
329             }
330              
331             sub _next_response {
332 42     42   54 my $self = shift;
333 42   100     118 my $wait_for = shift || 0;
334 42         122 my $end_time = time + $wait_for;
335 42         45 my $resp_and_id = undef;
336              
337 42         102 while ( !$self->empty ) {
338 696         761 $resp_and_id = shift @{ $$self{to_return} };
  696         1123  
339              
340             # last if we have a response or we have run out of time.
341             last
342 696 100 100     3583 if $resp_and_id
343             || time > $end_time;
344              
345             # sleep for the default sleep time.
346             # warn "sleeping for " . $self->poll_interval;
347 656         33139 sleep $self->poll_interval;
348             }
349              
350             # If there is no result return false.
351 42 100       136 return unless $resp_and_id;
352              
353             # We have a response - delete the options for it from the store.
354 33         100 delete $self->{id_opts}{ $resp_and_id->[1] };
355              
356             # If we have a result return list or response depending on
357             # context.
358             return wantarray
359 33 50       183 ? @$resp_and_id
360             : $resp_and_id->[0];
361             }
362              
363             =head2 to_send_count
364              
365             my $pending = $async->to_send_count;
366              
367             Returns the number of items which have been added but have not yet started being processed.
368              
369             =cut
370              
371             sub to_send_count {
372 544     544 1 628 my $self = shift;
373 544         1537 $self->poke;
374 544         444 return scalar @{ $self->{to_send} };
  544         1782  
375             }
376              
377             =head2 to_return_count
378              
379             my $completed = $async->to_return_count;
380              
381             Returns the number of items which have completed transferring, and are waiting to be returned by next_response().
382              
383             =cut
384              
385             sub to_return_count {
386 3777     3777 1 2783 my $self = shift;
387 3777         3759 $self->poke;
388 3777         2445 return scalar @{ $self->{to_return} };
  3777         4288  
389             }
390              
391             =head2 in_progress_count
392              
393             my $running = $async->in_progress_count;
394              
395             Returns the number of items which are currently being processed asynchronously.
396              
397             =cut
398              
399             sub in_progress_count {
400 746     746 1 834 my $self = shift;
401 746         1050 $self->poke;
402 746         715 return scalar keys %{ $self->{in_progress} };
  746         2307  
403             }
404              
405             =head2 total_count
406              
407             my $total = $async->total_count;
408              
409             Returns the sum of the to_send_count, in_progress_count and to_return_count.
410              
411             This should be the total number of items which have been added that have not yet been returned by next_response().
412              
413             =cut
414              
415             sub total_count {
416 743     743 1 1889 my $self = shift;
417              
418 743         2929 my $count = 0 #
419             + $self->to_send_count #
420             + $self->in_progress_count #
421             + $self->to_return_count;
422              
423 743         2678 return $count;
424             }
425              
426             =head2 info
427              
428             print $async->info;
429              
430             Prints a line describing what the current state is.
431              
432             =cut
433              
434             sub info {
435 0     0 1 0 my $self = shift;
436              
437 0         0 return sprintf(
438             "HTTP::Async status: %4u,%4u,%4u (send, progress, return)\n",
439             $self->to_send_count, #
440             $self->in_progress_count, #
441             $self->to_return_count
442             );
443             }
444              
445             =head2 remove
446              
447             $async->remove($id);
448             my $success = $async->remove($id);
449              
450             Removes the item with the given id no matter which state it is currently in. Returns true if an item is removed, and false otherwise.
451              
452             =cut
453              
454             sub remove {
455 8     8 1 704 my $self = shift;
456 8         11 my $id = shift;
457              
458 8         13 my $hashref = delete $self->{in_progress}{$id};
459 8 100       18 if (!$hashref) {
460 5         15 for my $list ('to_send', 'to_return') {
461 10         12 my ($r_and_id) = grep { $_->[1] eq $id } @{ $self->{$list} };
  3         9  
  10         19  
462 10         7 $hashref = $r_and_id->[0];
463 10 100       24 if ($hashref) {
464 3         8 @{ $self->{$list} }
465 3         5 = grep { $_->[1] ne $id } @{ $self->{$list} };
  3         7  
  3         5  
466             }
467             }
468             }
469 8 100       20 return if !$hashref;
470              
471 6         7 my $s = $hashref->{handle};
472 6         10 $self->_io_select->remove($s);
473 6         164 delete $self->{id_opts}{$id};
474              
475 6         199 return 1;
476             }
477              
478             =head2 remove_all
479              
480             $async->remove_all;
481             my $success = $async->remove_all;
482              
483             Removes all items no matter what states they are currently in. Returns true if any items are removed, and false otherwise.
484              
485             =cut
486              
487             sub remove_all {
488 2     2 1 3 my $self = shift;
489 2 100       5 return if $self->empty;
490              
491             my @ids = (
492 0         0 (map { $_->[1] } @{ $self->{to_send} }),
  1         2  
493 1         2 (keys %{ $self->{in_progress} }),
494 1         3 (map { $_->[1] } @{ $self->{to_return} }),
  1         4  
  1         4  
495             );
496              
497 1         6 for my $id (@ids) {
498 2         4 $self->remove($id);
499             }
500              
501 1         4 return 1;
502             }
503              
504             =head2 empty, not_empty
505              
506             while ( $async->not_empty ) { ...; }
507             while (1) { ...; last if $async->empty; }
508              
509             Returns true or false depending on whether there are request or responses
510             still on the object.
511              
512             =cut
513              
514             sub empty {
515 703     703 1 2559 my $self = shift;
516 703 100       2743 return $self->total_count ? 0 : 1;
517             }
518              
519             sub not_empty {
520 3     3 1 1715 my $self = shift;
521 3         5 return !$self->empty;
522             }
523              
524             =head2 DESTROY
525              
526             The destroy method croaks if an object is destroyed but is not empty. This is
527             to help with debugging.
528              
529             =cut
530              
531             sub DESTROY {
532 30     30   9367826 my $self = shift;
533 30         67 my $class = ref $self;
534              
535 30 50       87 carp "$class object destroyed but still in use"
536             if $self->total_count;
537              
538             carp "$class INTERNAL ERROR: 'id_opts' not empty"
539 30 50       40 if scalar keys %{ $self->{id_opts} };
  30         116  
540              
541 30         971 return;
542             }
543              
544             # Go through all the values on the select list and check to see if
545             # they have been fully received yet.
546              
547             sub _process_in_progress {
548 8329     8329   5622 my $self = shift;
549 8329         8674 my %seen_ids = ();
550              
551             HANDLE:
552 8329         9836 foreach my $s ( $self->_io_select->can_read(0) ) {
553              
554             # Get the id and add it to the hash of seen ids so we don't check it
555             # later for errors.
556 232   50     7096 my $id = $self->{fileno_to_id}{ $s->fileno }
557             || die "INTERNAL ERROR: could not got id for fileno";
558 232         2016 $seen_ids{$id}++;
559              
560 232         385 my $hashref = $$self{in_progress}{$id};
561 232   100     832 my $tmp = $hashref->{tmp} ||= {};
562              
563             # warn Dumper $hashref;
564              
565             # Check that we have not timed-out.
566 232 100 66     1621 if ( time > $hashref->{timeout_at}
567             || time > $hashref->{finish_by} )
568             {
569              
570             # warn sprintf "Timeout: %.3f > %.3f", #
571             # time, $hashref->{timeout_at};
572              
573             $self->_add_error_response_to_return(
574             id => $id,
575             code => 504,
576             request => $hashref->{request},
577             previous => $hashref->{previous},
578 1         14 content => 'Timed out',
579             );
580              
581 1         4 $self->_io_select->remove($s);
582 1         67 delete $$self{fileno_to_id}{ $s->fileno };
583 1         17 next HANDLE;
584             }
585              
586             # If there is a code then read the body.
587 231 100       457 if ( $$tmp{code} ) {
588 169         287 my $buf;
589 169         560 my $n = $s->read_entity_body( $buf, 1024 * 16 ); # 16kB
590 169 100       9588 $$tmp{is_complete} = 1 unless $n;
591 169         752 $$tmp{content} .= $buf;
592              
593             # warn "Received " . length( $buf ) ;
594              
595             # warn $buf;
596             }
597              
598             # If no code try to read the headers.
599             else {
600 62         374 $s->flush;
601              
602 62         68 my ( $code, $message, %headers );
603              
604 62         103 eval {
605 62         304 ( $code, $message, %headers ) =
606             $s->read_response_headers( laxed => 1, junk_out => [] );
607             };
608              
609 62 100       13193 if ($@) {
610             $self->_add_error_response_to_return(
611             'code' => 504,
612             'content' => $@,
613             'id' => $id,
614             'request' => $hashref->{request},
615             'previous' => $hashref->{previous}
616 1         9 );
617 1         3 $self->_io_select->remove($s);
618 1         39 delete $$self{fileno_to_id}{ $s->fileno };
619 1         8 next HANDLE;
620             }
621              
622 61 50       174 if ($code) {
623              
624             # warn "Got headers: $code $message " . time;
625              
626 61         152 $$tmp{code} = $code;
627 61         142 $$tmp{message} = $message;
628 61         184 my @headers_array = map { $_, $headers{$_} } keys %headers;
  233         360  
629 61         214 $$tmp{headers} = \@headers_array;
630              
631             }
632             }
633              
634             # Reset the timeout.
635 230         742 $hashref->{timeout_at} = time + $self->_get_opt( 'timeout', $id );
636             # warn "recieved - timeout set to '$hashref->{timeout_at}'";
637              
638             # If the message is complete then create a request and add it
639             # to 'to_return';
640 230 100       544 if ( $$tmp{is_complete} ) {
641 59         141 delete $$self{fileno_to_id}{ $s->fileno };
642 59         287 $self->_io_select->remove($s);
643              
644             # warn Dumper $$hashref{content};
645              
646             my $response = HTTP::Response->new(
647 59         2290 @$tmp{ 'code', 'message', 'headers', 'content' } );
648              
649 59         8510 $response->request( $hashref->{request} );
650 59 100       461 $response->previous( $hashref->{previous} ) if $hashref->{previous};
651              
652             # Deal with cookies
653 59         241 my $jar = $self->_get_opt('cookie_jar', $id);
654 59 100       114 if ($jar) {
655 1         12 $jar->extract_cookies($response);
656             }
657              
658             # If it was a redirect and there are still redirects left
659             # create a new request and unshift it onto the 'to_send'
660             # array.
661             # Only redirect GET and HEAD as per RFC 2616.
662             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
663 59         518 my $code = $response->code;
664 59         353 my $get_or_head = $response->request->method =~ m{^(?:GET|HEAD)$};
665              
666 59 100 100     966 if (
      33        
      66        
      66        
      66        
667             $response->is_redirect # is a redirect
668             && $hashref->{redirects_left} > 0 # and we still want to follow
669             && ($get_or_head || $code !~ m{^30[127]$}) # must be GET or HEAD if it's 301, 302 or 307
670             && $code != 304 # not a 'not modified' reponse
671             && $code != 305 # not a 'use proxy' response
672             )
673             {
674              
675 30         393 $hashref->{redirects_left}--;
676              
677 30         80 my $loc = $response->header('Location');
678 30         826 my $uri = $response->request->uri;
679              
680 30 50 33     288 warn "Problem: " . Dumper( { loc => $loc, uri => $uri } )
      33        
      33        
681             unless $uri && ref $uri && $loc && !ref $loc;
682              
683 30         323 my $url = _make_url_absolute( url => $loc, ref => $uri );
684              
685 30         4857 my $request = $response->request->clone;
686 30         2784 $request->uri($url);
687              
688             # These headers should never be forwarded
689 30         1427 $request->remove_header('Host', 'Cookie');
690              
691             # Don't leak private information.
692             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec15.html#sec15.1.3
693 30 0 33     577 if ($request->header('Referer') &&
      33        
694             $hashref->{request}->uri->scheme eq 'https' &&
695             $request->uri->scheme eq 'http') {
696              
697 0         0 $request->remove_header('Referer');
698             }
699              
700             # See Other should use GET
701 30 50 33     775 if ($code == 303 && !$get_or_head) {
702 0         0 $request->method('GET');
703 0         0 $request->content('');
704 0         0 $request->remove_content_headers;
705             }
706              
707 30         82 $self->_send_request( [ $request, $id ] );
708 30         65 $hashref->{previous} = $response;
709             }
710             else {
711 29         497 $self->_add_to_return_queue( [ $response, $id ] );
712 29         119 delete $$self{in_progress}{$id};
713             }
714              
715 59         280 delete $hashref->{tmp};
716             }
717             }
718              
719             # warn Dumper(
720             # {
721             # in_progress => $self->{in_progress},
722             # seen_ids => \%seen_ids,
723             # }
724             # );
725              
726 8329         46612 foreach my $id ( keys %{ $self->{in_progress} } ) {
  8329         13344  
727              
728             # skip this one if it was processed above.
729 7546 100       9976 next if $seen_ids{$id};
730              
731 7345         5612 my $hashref = $self->{in_progress}{$id};
732              
733 7345 100 100     29007 if ( time > $hashref->{timeout_at}
734             || time > $hashref->{finish_by} )
735             {
736              
737             # warn Dumper( { hashref => $hashref, now => time } );
738              
739             # we have a request that has timed out - handle it
740             $self->_add_error_response_to_return(
741             id => $id,
742             code => 504,
743             request => $hashref->{request},
744             previous => $hashref->{previous},
745 3         40 content => 'Timed out',
746             );
747              
748 3         8 my $s = $hashref->{handle};
749 3         8 $self->_io_select->remove($s);
750 3         177 delete $$self{fileno_to_id}{ $s->fileno };
751             }
752             }
753              
754 8329         8961 return 1;
755             }
756              
757             sub _add_to_return_queue {
758 36     36   53 my $self = shift;
759 36         48 my $req_and_id = shift;
760 36         50 push @{ $$self{to_return} }, $req_and_id;
  36         91  
761 36         54 return 1;
762             }
763              
764             # Add all the items waiting to be sent to 'to_send' up to the 'slots'
765             # limit.
766              
767             sub _process_to_send {
768 8329     8329   5945 my $self = shift;
769              
770 8329   66     5709 while ( scalar @{ $$self{to_send} }
  8368         16336  
771 39         176 && $self->slots > scalar keys %{ $$self{in_progress} } )
772             {
773 39         47 $self->_send_request( shift @{ $$self{to_send} } );
  39         155  
774             }
775              
776 8329         6500 return 1;
777             }
778              
779             sub _send_request {
780 69     69   78 my $self = shift;
781 69         69 my $r_and_id = shift;
782 69         98 my ( $request, $id ) = @$r_and_id;
783              
784 69         173 my $uri = URI->new( $request->uri );
785              
786 69         4187 my %args = ();
787              
788             # Get cookies from jar if one exists
789 69         179 my $jar = $self->_get_opt('cookie_jar', $id);
790 69 100       137 if ($jar) {
791 1         7 $jar->add_cookie_header($request);
792             }
793              
794             # We need to use a different request_uri for proxied requests. Decide to use
795             # this if a proxy port or host is set.
796             #
797             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
798 69         654 $args{Host} = $uri->host;
799 69         2016 $args{PeerAddr} = $self->_get_opt( 'proxy_host', $id );
800 69         119 $args{PeerPort} = $self->_get_opt( 'proxy_port', $id );
801 69         143 $args{LocalAddr} = $self->_get_opt('local_addr', $id );
802 69         133 $args{LocalPort} = $self->_get_opt('local_port', $id );
803              
804             # https://rt.cpan.org/Public/Bug/Display.html?id=33071
805 69         129 $args{Timeout} = $self->_get_opt( 'timeout', $id);
806              
807             # ACF - Pass ssl_options through
808 69         118 $args{ssl_opts} = $self->_get_opt( 'ssl_options', $id);
809              
810             my $request_is_to_proxy =
811             ( $args{PeerAddr} || $args{PeerPort} ) # if either are set...
812 69 100 66     307 ? 1 # ...then we are a proxy request
813             : 0; # ...otherwise not
814              
815             # If we did not get a setting from the proxy then use the uri values.
816              
817 69   66     261 $args{PeerAddr} ||= $uri->host;
818 69         1131 my $peer_address = $self->_get_opt('peer_addr', $id );
819 69 100       175 if($peer_address) {
820 1         2 $args{PeerAddr} = $peer_address;
821             }
822              
823 69   66     290 $args{PeerPort} ||= $uri->port;
824              
825 69         1200 my $net_http_class = 'Net::HTTP::NB';
826 69 50 33     216 if ($uri->scheme and $uri->scheme eq 'https' and not $request_is_to_proxy) {
    50 33        
      33        
      33        
827 0         0 $net_http_class = 'Net::HTTPS::NB';
828 0         0 eval {
829 0         0 require Net::HTTPS::NB;
830 0         0 Net::HTTPS::NB->import();
831             };
832 0 0       0 die "$net_http_class must be installed for https support" if $@;
833              
834             # Add SSL options, if any, to args
835 0         0 my $ssl_options = $self->_get_opt('ssl_options');
836 0 0       0 @args{ keys %$ssl_options } = values %$ssl_options if $ssl_options;
837             }
838             elsif($uri->scheme and $uri->scheme eq 'https' and $request_is_to_proxy) {
839             # We are making an HTTPS request through an HTTP proxy such as squid.
840             # The proxy will handle the HTTPS, we need to connect to it via HTTP
841             # and then make a request where the https is clear from the scheme...
842             $args{Host} = sprintf(
843             '%s:%s',
844 0         0 delete @args{'PeerAddr', 'PeerPort'}
845             );
846             }
847 69         3007 my $s = eval { $net_http_class->new(%args) };
  69         764  
848              
849             # We could not create a request - fake up a 503 response with
850             # error as content.
851 69 100       90979 if ( !$s ) {
852              
853             $self->_add_error_response_to_return(
854             id => $id,
855             code => 503,
856             request => $request,
857             previous => $$self{in_progress}{$id}{previous},
858 2         17 content => $@,
859             );
860              
861 2         22 return 1;
862             }
863              
864 67         83 my %headers;
865 67         342 for my $key ($request->{_headers}->header_field_names) {
866 1         35 $headers{$key} = $request->header($key);
867             }
868              
869             # Decide what to use as the request_uri
870 67 100       940 my $request_uri = $request_is_to_proxy # is this a proxy request....
871             ? $uri->as_string # ... if so use full url
872             : _strip_host_from_uri($uri); # ...else strip off scheme, host and port
873              
874 67 50       211 croak "Could not write request to $uri '$!'"
875             unless $s->write_request( $request->method, $request_uri, %headers,
876             $request->content );
877              
878 67         10687 $self->_io_select->add($s);
879              
880 67         2139 my $time = time;
881 67   100     317 my $entry = $$self{in_progress}{$id} ||= {};
882              
883 67         236 $$self{fileno_to_id}{ $s->fileno } = $id;
884              
885 67         399 $entry->{request} = $request;
886 67         102 $entry->{started_at} = $time;
887              
888            
889 67         138 $entry->{timeout_at} = $time + $self->_get_opt( 'timeout', $id );
890             # warn "sent - timeout set to '$entry->{timeout_at}'";
891              
892 67         129 $entry->{finish_by} = $time + $self->_get_opt( 'max_request_time', $id );
893 67         140 $entry->{handle} = $s;
894              
895             $entry->{redirects_left} = $self->_get_opt( 'max_redirect', $id )
896 67 100       185 unless exists $entry->{redirects_left};
897              
898 67         256 return 1;
899             }
900              
901             sub _strip_host_from_uri {
902 69     69   7682 my $uri = shift;
903              
904 69         202 my $scheme_and_auth = quotemeta( $uri->scheme . '://' . $uri->authority );
905 69         1689 my $url = $uri->as_string;
906              
907 69         738 $url =~ s/^$scheme_and_auth//;
908 69 100       320 $url = "/$url" unless $url =~ m{^/};
909              
910 69         132 return $url;
911             }
912              
913             sub _io_select {
914 8466     8466   5605 my $self = shift;
915 8466   66     26908 return $$self{io_select} ||= IO::Select->new();
916             }
917              
918             sub _make_url_absolute {
919 34     34   6964 my %args = @_;
920              
921 34         35 my $in = $args{url};
922 34         28 my $ref = $args{ref};
923              
924 34         114 return URI->new_abs($in, $ref)->as_string;
925             }
926              
927             sub _add_error_response_to_return {
928 7     7   15 my $self = shift;
929 7         101 my %args = @_;
930              
931 34     34   81183 use HTTP::Status;
  34         51  
  34         10463  
932              
933             my $response =
934             HTTP::Response->new( $args{code}, status_message( $args{code} ),
935 7         96 undef, $args{content} );
936              
937 7         669 $response->request( $args{request} );
938 7 50       69 $response->previous( $args{previous} ) if $args{previous};
939              
940 7         96 $self->_add_to_return_queue( [ $response, $args{id} ] );
941 7         24 delete $$self{in_progress}{ $args{id} };
942              
943 7         21 return $response;
944              
945             }
946              
947             =head1 SEE ALSO
948              
949             L - a polite form of this module. Slows the scraping down
950             by domain so that the remote server is not overloaded.
951              
952             =head1 GOTCHAS
953              
954             The responses may not come back in the same order as the requests were made.
955             For https requests to work, you must have L installed.
956              
957             =head1 THANKS
958              
959             Egor Egorov contributed patches for proxies, catching connections that die
960             before headers sent and more.
961              
962             Tomohiro Ikebe from livedoor.jp submitted patches (and a test) to properly
963             handle 304 responses.
964              
965             Naveed Massjouni for adding the https handling code.
966              
967             Alex Balhatchet for adding the https + proxy handling code, and for making the
968             tests run ok in parallel.
969              
970             Josef Toman for fixing two bugs, one related to header handling and another
971             related to producing an absolute URL correctly.
972              
973             Github user 'c00ler-' for adding LocalAddr and LocalPort support.
974              
975             rt.cpan.org user 'Florian (fschlich)' for typo in documentation.
976              
977             Heikki Vatiainen for the ssl_options support patch.
978              
979             Daniel Lintott of the Debian Perl Group for pointing out a test failure when
980             using a very recent version of HTTP::Server::Simple to implement
981             t/TestServer.pm
982              
983             =head1 BUGS AND REPO
984              
985             Please submit all bugs, patches etc on github
986              
987             L
988              
989             =head1 AUTHOR
990              
991             Edmund von der Burg C<< >>.
992              
993             L
994              
995             =head1 LICENCE AND COPYRIGHT
996              
997             Copyright (c) 2006, Edmund von der Burg C<< >>.
998             All rights reserved.
999              
1000             This module is free software; you can redistribute it and/or modify it under
1001             the same terms as Perl itself.
1002              
1003             =head1 DISCLAIMER OF WARRANTY
1004              
1005             BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE
1006             SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE
1007             STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE
1008             SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED,
1009             INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
1010             FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND
1011             PERFORMANCE OF THE SOFTWARE IS WITH YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE,
1012             YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR, OR CORRECTION.
1013              
1014             IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY
1015             COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE
1016             SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE LIABLE TO YOU FOR DAMAGES,
1017             INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
1018             OUT OF THE USE OR INABILITY TO USE THE SOFTWARE (INCLUDING BUT NOT LIMITED TO
1019             LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
1020             THIRD PARTIES OR A FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER
1021             SOFTWARE), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
1022             POSSIBILITY OF SUCH DAMAGES.
1023              
1024             =cut
1025              
1026             1;
1027