File Coverage

blib/lib/Parallel/Downloader.pm
Criterion Covered Total %
statement 81 88 92.0
branch 8 14 57.1
condition 1 2 50.0
subroutine 15 16 93.7
pod 2 2 100.0
total 107 122 87.7


line stmt bran cond sub pod time code
1 1     1   53223 use strictures;
  1         3  
  1         9  
2              
3             package Parallel::Downloader;
4              
5             our $VERSION = '0.132071'; # VERSION
6              
7             # ABSTRACT: simply download multiple files at once
8              
9             #
10             # This file is part of Parallel-Downloader
11             #
12             #
13             # Christian Walde has dedicated the work to the Commons by waiving all of his
14             # or her rights to the work worldwide under copyright law and all related or
15             # neighboring legal rights he or she had in the work, to the extent allowable by
16             # law.
17             #
18             # Works under CC0 do not require attribution. When citing the work, you should
19             # not imply endorsement by the author.
20             #
21              
22              
23 1     1   957 use Moo;
  1         13447  
  1         6  
24 1     1   2504 use MooX::Types::MooseLike::Base qw( Bool Int HashRef CodeRef ArrayRef );
  1         6532  
  1         495  
25              
26             sub {
27             has requests => ( is => 'ro', isa => ArrayRef, required => 1 );
28             has workers => ( is => 'ro', isa => Int, default => sub { 10 } );
29             has conns_per_host => ( is => 'ro', isa => Int, default => sub { 4 } );
30             has aehttp_args => ( is => 'ro', isa => HashRef, default => sub { {} } );
31             has debug => ( is => 'ro', isa => Bool, default => sub { 0 } );
32             has logger => ( is => 'ro', isa => CodeRef, default => sub { \&_default_log } );
33             has build_response => ( is => 'ro', isa => CodeRef, default => sub { \&_default_build_response } );
34             has sorted => ( is => 'ro', isa => Bool, default => sub { 1 } );
35              
36             has _consumables => ( is => 'lazy', isa => ArrayRef, builder => '_requests_interleaved_by_host' );
37              
38             has _responses => ( is => 'ro', isa => ArrayRef, default => sub { [] } );
39             has _cv => ( is => 'ro', isa => sub { $_[0]->isa( 'AnyEvent::CondVar' ) }, default => sub { AnyEvent->condvar } );
40             }
41             ->();
42              
43 1     1   10 use AnyEvent::HTTP;
  1         1  
  1         88  
44 1     1   875 use Sub::Exporter::Simple 'async_download';
  1         21098  
  1         9  
45              
46              
47             sub async_download {
48 1     1 1 287 return __PACKAGE__->new( @_ )->run;
49             }
50              
51             sub _requests_interleaved_by_host {
52 1     1   650 my ( $self, $requests ) = @_;
53              
54 1         2 my %hosts;
55 1         3 for ( @{ $self->requests } ) {
  1         4  
56 3         16 my $host_name = $_->uri->host;
57 3   50     310 my $host = $hosts{$host_name} ||= [];
58 3         4 push @{$host}, $_;
  3         9  
59             }
60              
61 1         3 my @interleaved_list;
62 1         5 while ( keys %hosts ) {
63 1         4 push @interleaved_list, shift @{$_} for values %hosts;
  3         8  
64 1         4 for ( keys %hosts ) {
65 3 50       4 next if @{ $hosts{$_} };
  3         9  
66 3         10 delete $hosts{$_};
67             }
68             }
69              
70 1         26 return \@interleaved_list;
71             }
72              
73              
74             sub run {
75 1     1 1 51 my ( $self ) = @_;
76              
77 1         6 local $AnyEvent::HTTP::MAX_PER_HOST = $self->conns_per_host;
78              
79 1         6 for ( 1 .. $self->_sanitize_worker_max ) {
80 3         19 $self->_cv->begin;
81 3         88 $self->_log( msg => "$_ started", type => "WorkerStart", worker_id => $_ );
82 3         8 $self->_add_request( $_ );
83             }
84              
85 1         10 $self->_cv->recv;
86              
87 1 50       15 return @{ $self->_responses } if !$self->sorted;
  0         0  
88              
89 1         3 my %unsorted = map { 0 + $_->[2] => $_ } @{ $self->_responses };
  3         24  
  1         5  
90 1         4 my @sorted = map { $unsorted{ 0 + $_ } } @{ $self->requests };
  3         7  
  1         5  
91              
92 1         7 return @sorted;
93             }
94              
95             sub _add_request {
96 6     6   20 my ( $self, $worker_id ) = @_;
97              
98 6         6 my $req = shift @{ $self->_consumables };
  6         182  
99 6 100       101 return $self->_end_worker( $worker_id ) if !$req;
100              
101 3         7 my $post_download_sub = $self->_make_post_download_sub( $worker_id, $req );
102              
103 3         89 http_request(
104             $req->method,
105             $req->uri->as_string,
106             body => $req->content,
107             headers => $req->{_headers},
108 3         10 %{ $self->aehttp_args },
109             $post_download_sub
110             );
111              
112 3         138 my $host_name = $req->uri->host;
113 3         70 $self->_log(
114             msg => "$worker_id accepted new request for $host_name",
115             type => "WorkerRequestAdd",
116             worker_id => $worker_id,
117             req => $req
118             );
119              
120 3         7 return;
121             }
122              
123             sub _make_post_download_sub {
124 3     3   6 my ( $self, $worker_id, $req ) = @_;
125              
126             my $post_download_sub = sub {
127 3     3   279438 push @{ $self->_responses }, $self->build_response->( @_, $req );
  3         56  
128              
129 3         31 my $host_name = $req->uri->host;
130 3         216 $self->_log(
131             msg => "$worker_id completed a request for $host_name",
132             type => "WorkerRequestEnd",
133             worker_id => $worker_id,
134             req => $req
135             );
136              
137 3         15 $self->_add_request( $worker_id );
138 3         35 return;
139 3         12 };
140              
141 3         5 return $post_download_sub;
142             }
143              
144             sub _default_build_response {
145 3     3   8 my ( $body, $hdr, $req ) = @_;
146 3         18 return [ $body, $hdr, $req ];
147             }
148              
149             sub _end_worker {
150 3     3   8 my ( $self, $worker_id ) = @_;
151 3         20 $self->_log( msg => "$worker_id ended", type => "WorkerEnd", worker_id => $worker_id );
152 3         44 $self->_cv->end;
153 3         55 return;
154             }
155              
156             sub _sanitize_worker_max {
157 1     1   2 my ( $self ) = @_;
158              
159 1 50       8 die "max should be 0 or more" if $self->workers < 0;
160              
161 1         5 my $request_count = @{ $self->requests };
  1         6  
162              
163 1 50       7 return $request_count if !$self->workers; # 0 = as many parallel as possible
164 1 50       13 return $request_count if $self->workers > $request_count; # not more than the request count
165              
166 0         0 return $self->workers;
167             }
168              
169             sub _log {
170 12     12   58 my ( $self, %msg ) = @_;
171 12 50       62 return if !$self->debug;
172 0           $self->logger->( $self, \%msg );
173 0           return;
174             }
175              
176             sub _default_log {
177 0     0     my ( $self, $msg ) = @_;
178 0           print "$msg->{msg}\n";
179 0           return;
180             }
181              
182             1;
183              
184             __END__