File Coverage

blib/lib/HTTP/Client/Parallel.pm
Criterion Covered Total %
statement 100 153 65.3
branch 12 52 23.0
condition 9 28 32.1
subroutine 27 32 84.3
pod 0 7 0.0
total 148 272 54.4


line stmt bran cond sub pod time code
1             package HTTP::Client::Parallel;
2              
3             =pod
4              
5             =head1 NAME
6              
7             HTTP::Client::Parallel - A HTTP client that fetchs all URIs in parallel
8              
9             =head1 SYNOPSIS
10              
11             # Create the parallising client
12             my $client = HTTP::Client::Parallel->new;
13            
14             # Simple fetching
15             my $pages = $client->get(
16             'http://www.google.com/',
17             'http://www.yapc.org/',
18             'http://www.yahoo.com/',
19             );
20            
21             # Mirroring to disk
22             my $responses = $client->mirror(
23             'http://www.google.com/' => 'mirrors/google.html',
24             'http://www.yapc.org/' => 'mirrors/yapc.html',
25             'http://www.yahoo.com/' => 'mirrors/yahoo.html',
26             );
27              
28             =head1 DESCRIPTION
29              
30             Fetching a URI is a very common network-bound task in many types of
31             programming. Fetching more than one URI is also very common, but unless
32             the fetches are capable of entirely saturating a connection, typically
33             time is wasted because there is often no logical reason why multiple
34             requests cannot be made in parallel.
35              
36             Executing IO-bound and network-bound tasks is extremely easy in any
37             event-based programming model such as L, but these event-based
38             systems normally require complete control of the application and
39             that the program be written in a very different way.
40              
41             Thus, the biggest problem preventing running HTTP requests in
42             parallel is not that it isn't possible, but that mixing procedural
43             and event programming is difficult.
44              
45             The few existing mechanisms generally rely on forking or other
46             platform-specific methods.
47              
48             B is designed to bridge the gap between
49             typical cross-platform procedural code and typical cross-platform
50             event-based code.
51              
52             It allows you to set up a series of HTTP tasks (fetching to memory,
53             fetching to disk, and mirroring to disk) and then issue a single
54             method call which will block and execute all of them in parallel.
55              
56             Behind the scenes HTTP::Client::Parallel will B hand
57             over control of the process to L to execute the HTTP tasks.
58              
59             Once all of the HTTP tasks are completed (using the standard
60             L module, the POE kernel will shut
61             down and hand control of the application back to the normal
62             procedural code, and thus back to your code.
63              
64             As a result, a developer with no knowledge of L or event-based
65             programming can still take advantage of the capabilities of POE and
66             gain major speed increases in HTTP-based programs with relatively
67             little work.
68              
69             =head1 METHODS
70              
71             TO BE COMPLETED
72              
73             =cut
74              
75 2     2   565699 use 5.006;
  2         7  
  2         113  
76 2     2   14 use strict;
  2         4  
  2         84  
77 2     2   23 use warnings;
  2         5  
  2         67  
78 2     2   11 use Exporter ();
  2         3  
  2         45  
79 2     2   973 use IO::File ();
  2         12681  
  2         67  
80 2     2   3208 use Params::Util '_INSTANCE';
  2         8947  
  2         197  
81 2     2   1357 use HTTP::Date ();
  2         8198  
  2         50  
82 2     2   1418 use HTTP::Request ();
  2         32021  
  2         47  
83 2     2   4276 use POE;
  2         82874  
  2         15  
84 2     2   338826 use POE::Session;
  2         6  
  2         11  
85 2     2   3300 use POE::Component::Client::HTTP;
  2         255329  
  2         72  
86              
87 2     2   22 use constant HCP => __PACKAGE__;
  2         5  
  2         145  
88 2     2   12 use constant DEFAULT_REDIRECT_DEPTH => 2;
  2         3  
  2         98  
89 2     2   12 use constant DEFAULT_TIMEOUT => 60;
  2         5  
  2         113  
90              
91 2     2   14 use vars qw{$VERSION @ISA @EXPORT_OK};
  2         6  
  2         243  
92             BEGIN {
93 2     2   4 $VERSION = '0.02';
94 2         44 @ISA = 'Exporter';
95 2         3317 @EXPORT_OK = qw{ mirror getstore get };
96             }
97              
98             sub new {
99 2     2 0 23 my ( $class, %args ) = @_;
100 2   50     79 return bless {
      50        
      50        
      50        
101             requests => {},
102             results => {},
103             count => 0,
104             debug => $args{debug} || 0,
105             http_alias => $args{http_alias} || 'ua',
106             timeout => $args{timeout} || DEFAULT_TIMEOUT,
107             redirect_depth => $args{redirect_depth} || DEFAULT_REDIRECT_DEPTH,
108             }, $class;
109             }
110              
111             sub urls {
112 2 50   2 0 9 return wantarray ? @{ $_[0]->{urls} } : $_[0]->{urls};
  2         14  
113             }
114              
115             sub get {
116 2 100   2 0 3470 my $self = _INSTANCE($_[0], HCP) ? shift : HCP->new;
117 2         12 $self->_init;
118 2         36 $self->_set_urls(@_);
119 2         7 $self->poe_loop;
120 2         15 my @responses = map { $self->{responses}{$_} } $self->urls;
  2         13  
121 2 50       65 return wantarray ? @responses : \@responses
122              
123             # XXX from the pod, this may be the desired behavior
124             # my @content = map { $self->{responses}{$_}->content } $self->urls;
125             # return wantarray ? @content : \@content
126             }
127              
128             sub getstore {
129 0 0   0 0 0 my $self = _INSTANCE($_[0], HCP) ? shift : HCP->new;
130 0         0 my %url_file_map = @_;
131 0         0 $self->_init;
132 0         0 $self->_set_urls( keys %url_file_map );
133 0         0 $self->{local_files} = \%url_file_map;
134 0         0 $self->poe_loop;
135 0         0 my @responses = values %{ $self->{responses} };
  0         0  
136 0 0       0 return wantarray ? @responses : \@responses;
137             }
138              
139             sub mirror {
140 0 0   0 0 0 my $self = _INSTANCE($_[0], HCP) ? shift : HCP->new;
141 0         0 my %url_file_map = @_;
142 0         0 $self->_init;
143 0         0 $self->_set_urls( keys %url_file_map );
144 0         0 $self->{local_files} = \%url_file_map;
145 0         0 $self->_build_modified_since( \%url_file_map );
146 0         0 $self->poe_loop;
147 0         0 my @responses = values %{ $self->{responses} };
  0         0  
148 0 0       0 return wantarray ? @responses : \@responses
149             }
150              
151             # cleanup between uses just in case
152             sub _init {
153 2     2   5 my ($self) = @_;
154 2         5 $self->{count} = 0;
155 2         4 delete @{$self}{qw(requests responses urls url_count local_files modified_since)};
  2         9  
156 2         4 return;
157             }
158              
159             sub _set_urls {
160 2     2   9 my ( $self, @urls ) = @_;
161 2         5 $self->{urls} = \@urls;
162 2         11 $self->{url_count} = @urls;
163 2         9 return;
164             }
165              
166             sub _build_modified_since {
167 0     0   0 my ( $self, $url_file_map ) = @_;
168 0         0 for my $url ( keys %$url_file_map ) {
169 0         0 my $file = $url_file_map->{$url};
170 0 0       0 if ( -e $file ) {
171 0         0 my ($mtime) = ( stat($file) )[9];
172 0 0       0 $self->{modified_since}{$url} = HTTP::Date::time2str($mtime) if $mtime;
173             }
174             }
175             }
176              
177             sub _store_local_file {
178 0     0   0 my ( $self, $response, $file ) = @_;
179              
180 0         0 my $tmpfile = "$file-$$";
181              
182 0 0       0 open my $tmp_fh, ">", $tmpfile or die "Can't open temp file $tmpfile for writing: $!";
183 0         0 print $tmp_fh $response->content;
184 0         0 close $tmp_fh;
185              
186             # the following is taken from LWP::UserAgent->mirror
187 0         0 my $file_length = ( stat($tmpfile) )[7];
188 0         0 my ($content_length) = $response->header('Content-length');
189              
190 0 0 0     0 if ( defined $content_length and $file_length < $content_length ) {
    0 0        
191 0         0 unlink($tmpfile);
192 0         0 die "Transfer truncated: only $file_length out of $content_length bytes received\n";
193             }
194             elsif ( defined $content_length and $file_length > $content_length ) {
195 0         0 unlink($tmpfile);
196 0         0 die "Content-length mismatch: expected $content_length bytes, got $file_length\n";
197             }
198             else { # OK
199 0 0       0 if ( -e $file ) {
200 0         0 chmod 0777, $file; # Some dosish systems fail to rename if the target exists
201 0         0 unlink $file;
202             }
203 0 0       0 rename( $tmpfile, $file ) or die "Cannot rename '$tmpfile' to '$file': $!\n";
204              
205 0 0       0 if ( my $lm = $response->last_modified ) {
206 0         0 utime $lm, $lm, $file; # make sure the file has the same last modification time
207             }
208             }
209              
210 0         0 return;
211             }
212              
213             # POE
214             sub poe_loop {
215 2     2 0 4 my $self = shift;
216              
217 2   50     37 POE::Component::Client::HTTP->spawn(
218             Alias => $self->{http_alias} || 'ua',
219             Timeout => $self->{timeout},
220             FollowRedirects => $self->{redirect_depth},
221             );
222              
223 2         4333 POE::Session->create( object_states => [
224             $self => [qw( _start _request _response shutdown _stop)]
225             ]);
226              
227 2         387 POE::Kernel->run;
228              
229 2         3119 return;
230             }
231              
232             sub _start {
233 2     2   367 my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];
234 2         19 $kernel->alias_set("$self");
235 2         78 $kernel->yield( _request => $_ ) for @{ $self->{urls} };;
  2         19  
236 2         128 return;
237             }
238              
239             sub _build_request {
240 2     2   4 my ( $self, $url ) = @_;
241            
242 2         3 my $request;
243 2 50 33     34 if ( Scalar::Util::blessed($url) and $url->isa('HTTP::Request') ) {
    50 33        
      33        
244 0         0 $request = $url;
245             }
246             elsif ( ( Scalar::Util::blessed($url) and $url->isa('URI') ) or !ref $url ) {
247 2         35 $request = HTTP::Request->new( GET => $url );
248             }
249             else {
250 0         0 die "[!!] invalid URI, HTTP::Request or url string: $url\n";
251             }
252              
253 2 50 33     402 if ( $self->{modified_since} && $self->{modified_since}{$url} ) {
254 0         0 $request->header( 'If-Modified-Since' => $self->{modified_since}{$url} );
255             }
256              
257 2         4 return $request;
258             }
259              
260             sub _request {
261 2     2   570 my ( $self, $kernel, $url ) = @_[ OBJECT, KERNEL, ARG0 ];
262              
263 2         7 my $request = $self->_build_request($url);
264 2 50       8 warn '[' . $request->uri . '] Attempting to fetch' . "\n" if $self->{debug};
265 2         14 $kernel->post( $self->{http_alias}, 'request', '_response', $request );
266              
267 2         223 return;
268             }
269              
270             sub _response {
271 2     2   800643 my ( $self, $kernel, $request_packet, $response_packet ) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
272              
273 2         12 my $request = $request_packet->[0];
274 2         4 my $response = $response_packet->[0];
275 2         26 $self->{responses}{ $request->uri } = $response;
276 2         40 $self->{count}++;
277              
278 2 50       21 if ( $response->is_success ) {
    0          
279 2 50       40 warn '[' . $request->uri . "] Fetched\n" if $self->{debug};
280 2 50       20 if ( my $local_file = $self->{local_files}{ $request->uri } ) {
281 0         0 $self->_store_local_file( $response, $local_file );
282             }
283             }
284             elsif ( $response->code == 304 ) {
285 0 0       0 warn '[' . $request->uri . "] Not Modified\n" if $self->{debug};
286             }
287             else {
288 0 0       0 warn '[' . $request->uri . "] HTTP Response Code: " . $response->code . "\n"
289             if $self->{debug};
290             }
291              
292 2 50       42 $kernel->call( ua => 'shutdown' ) if $self->{url_count} == $self->{count};
293              
294 2         23708 return;
295             }
296              
297             sub shutdown {
298 0     0 0 0 my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];
299 0         0 $kernel->alias_remove( $self->{http_alias} );
300 0         0 return;
301             }
302              
303             sub _stop {
304 2     2   18552 my $self = $_[OBJECT];
305             }
306              
307             1;
308              
309             =pod
310              
311             =head1 SUPPORT
312              
313             Bugs should be reported via the CPAN bug tracker at
314              
315             L
316              
317             For other issues, contact the author.
318              
319             =head1 AUTHORS
320              
321             Marlon Bailey Embailey@cpan.orgE
322              
323             Adam Kennedy Eadamk@cpan.orgE
324              
325             Jeff Bisbee Ejbisbee@cpan.orgE
326              
327             =head1 SEE ALSO
328              
329             L, L
330              
331             =head1 COPYRIGHT
332              
333             Copyright 2008 Marlon Bailey, Adam Kennedy and Jess Bisbee.
334              
335             This program is free software; you can redistribute
336             it and/or modify it under the same terms as Perl itself.
337              
338             The full text of the license can be found in the
339             LICENSE file included with this module.
340              
341             =cut