File Coverage

blib/lib/Net/Hadoop/WebHDFS/LWP.pm
Criterion Covered Total %
statement 27 125 21.6
branch 0 58 0.0
condition 0 27 0.0
subroutine 9 13 69.2
pod 1 2 50.0
total 37 225 16.4


line stmt bran cond sub pod time code
1             package Net::Hadoop::WebHDFS::LWP;
2             $Net::Hadoop::WebHDFS::LWP::VERSION = '0.009';
3 1     1   54429 use strict;
  1         1  
  1         25  
4 1     1   3 use warnings;
  1         1  
  1         27  
5 1     1   4 use parent 'Net::Hadoop::WebHDFS';
  1         1  
  1         5  
6              
7             # VERSION
8              
9 1     1   37681 use LWP::UserAgent;
  1         38936  
  1         29  
10 1     1   7 use Carp;
  1         1  
  1         98  
11 1     1   400 use Ref::Util qw( is_arrayref );
  1         497  
  1         101  
12 1     1   5 use Scalar::Util qw( openhandle );
  1         1  
  1         41  
13 1     1   388 use HTTP::Request::StreamingUpload;
  1         362  
  1         29  
14              
15 1         1006 use constant UA_PASSTHROUGH_OPTIONS => qw(
16             env_proxy
17             no_proxy
18             proxy
19 1     1   4 );
  1         1  
20              
21             sub new {
22 0     0 1   my $class = shift;
23 0           my %options = @_;
24 0   0       my $debug = delete $options{debug} || 0;
25              
26 0 0         require Data::Dumper if $debug;
27              
28 0           my $self = $class->SUPER::new(@_);
29              
30             # we don't need Furl
31 0           delete $self->{furl};
32              
33 0           $self->{debug} = $debug;
34              
35             # default timeout is a bit short, raise it
36 0   0       $self->{timeout} = $options{timeout} || 30;
37              
38             # For filehandle upload support
39 0   0       $self->{chunksize} = $options{chunksize} || 4096;
40              
41             $self->{ua_opts} = {
42             map {
43 0           exists $options{$_} ? (
44 0 0         $_ => $options{ $_ }
45             ) : ()
46             } UA_PASSTHROUGH_OPTIONS
47             };
48              
49 0           $self->_create_ua;
50              
51 0           return $self;
52             }
53              
54             # Code below copied and modified for LWP from Net::Hadoop::WebHDFS
55             #
56             sub request {
57 0     0 0   my ( $self, $host, $port, $method, $path, $op, $params, $payload, $header ) = @_;
58              
59 0 0         my $request_path = $op ? $self->build_path( $path, $op, %$params ) : $path;
60              
61             # Note: ugly things done with URI, which is already used in the parent
62             # module. So we re-parse the path produced there. yuk.
63 0           my $uri = URI->new( $request_path, 'http' );
64              
65 0           $uri->host($host);
66 0           $uri->port($port);
67 0           $uri->scheme('http'); # no ssl for webhdfs? check the docs
68              
69 0 0         printf STDERR "URI : %s\n", $uri if $self->{debug};
70              
71 0           my $req;
72              
73 0 0 0       if ( $payload && openhandle($payload) ) {
    0          
74             $req = HTTP::Request::StreamingUpload->new(
75             $method => $uri,
76             fh => $payload,
77             headers => HTTP::Headers->new( 'Content-Length' => -s $payload, ),
78             chunk_size => $self->{chunksize},
79 0           );
80             }
81             elsif ( ref $payload ) {
82 0           croak __PACKAGE__ . " does not accept refs as content, only scalars and FH";
83             }
84             else {
85 0           $req = HTTP::Request->new( $method => $uri );
86 0           $req->content($payload);
87             }
88              
89 0 0         if ( is_arrayref( $header ) ) {
90 0           while ( my ( $h_field, $h_value ) = splice( @{ $header }, 0, 2 ) ) {
  0            
91 0           $req->header( $h_field => $h_value );
92             }
93             }
94              
95 0           my $real_res = $self->{ua}->request($req);
96              
97 0           my $res = { code => $real_res->code, body => $real_res->decoded_content };
98 0           my $code = $real_res->code;
99              
100 0 0         printf STDERR "HTTP code : %s\n", $code if $self->{debug};
101              
102 0           my $headers = $real_res->headers;
103              
104 0 0         printf STDERR "Headers: %s", Data::Dumper::Dumper $headers if $self->{debug};
105              
106 0 0         for my $h_key ( keys %{ $headers || {} } ) {
  0            
107 0           my $h_value = $headers->{$h_key};
108              
109 0 0         if ( $h_key =~ m!^location$!i ) { $res->{location} = $h_value; }
  0 0          
110 0           elsif ( $h_key =~ m!^content-type$!i ) { $res->{content_type} = $h_value; }
111             }
112              
113 0 0 0       return $res if $res->{code} >= 200 and $res->{code} <= 299;
114 0 0 0       return $res if $res->{code} >= 300 and $res->{code} <= 399;
115              
116 0   0       my $errmsg = $res->{body} || 'Response body is empty...';
117 0           $errmsg =~ s/\n//g;
118              
119             # Attempt to strigfy the HTML message
120 0 0         if ( $errmsg =~ m{ \A }xmsi ) {
121 0 0         if ( my @errors = $self->_parse_error_from_html( $errmsg ) ) {
122             # @error can also be assigned to a hash as it is mapped
123             # to kay=>value pairs, however strigifying the message
124             # is enough for now
125 0           my @flat;
126 0           while ( my ( $key, $val ) = splice( @errors, 0, 2 ) ) {
127 0           push @flat, "$key: $val"
128             }
129             # reset to something meaningful now that we've removed the html cruft
130 0           $errmsg = join '. ', @flat;
131             }
132             }
133              
134 0 0         if ( $code == 400 ) {
    0          
    0          
    0          
    0          
135 0           croak "ClientError: $errmsg";
136             }
137             elsif ( $code == 401 ) {
138             # this error happens for secure clusters when using Net::Hadoop::WebHDFS,
139             # but LWP::Authen::Negotiate takes care of it transparently in this module.
140             # we still may get this error on a secure cluster, when the credentials
141             # cache hasn't been initialized
142             my $extramsg = ( $headers->{'www-authenticate'} || '' ) eq 'Negotiate'
143 0 0 0       ? eval { require LWP::Authen::Negotiate; 1; }
  0 0          
  0            
144             ? q{ (Did you forget to run kinit?)}
145             : q{ (LWP::Authen::Negotiate doesn't seem available)}
146             : '';
147 0           croak "SecurityError$extramsg: $errmsg";
148             }
149             elsif ( $code == 403 ) {
150 0 0         if ( $errmsg =~ m{ \Qorg.apache.hadoop.ipc.StandbyException\E }xms ) {
151 0 0 0       if ( $self->{httpfs_mode} || not defined( $self->{standby_host} ) ) {
    0          
152              
153             # failover is disabled
154             }
155             elsif ( $self->{retrying} ) {
156              
157             # more failover is prohibited
158 0           $self->{retrying} = 0;
159             }
160             else {
161 0           $self->{under_failover} = not $self->{under_failover};
162 0           $self->{retrying} = 1;
163 0           my ( $next_host, $next_port ) = $self->connect_to;
164 0           my $val = $self->request(
165             $next_host,
166             $next_port,
167             $method,
168             $path,
169             $op,
170             $params,
171             $payload,
172             $header,
173             );
174 0           $self->{retrying} = 0;
175 0           return $val;
176             }
177             }
178 0           croak "IOError: $errmsg";
179             }
180             elsif ( $code == 404 ) {
181 0           croak "FileNotFoundError: $errmsg";
182             }
183             elsif ( $code == 500 ) {
184 0           croak "ServerError: $errmsg";
185             }
186             else {
187             # do nothing
188             }
189              
190             # catch-all exception
191 0           croak "RequestFailedError, code:$code, message:$errmsg";
192             }
193              
194             sub _create_ua {
195 0     0     my $self = shift;
196 0           my $class = ref $self;
197              
198             $self->{ua} = LWP::UserAgent->new(
199 0           %{ $self->{ua_opts} }
  0            
200             );
201              
202             $self->{ua}->agent(
203 0   0       sprintf "%s %s",
204             $class,
205             $class->VERSION || 'beta',
206             );
207              
208 0           $self->{useragent} = $self->{ua}->agent;
209 0           $self->{ua}->timeout( $self->{timeout} );
210              
211 0           return $self;
212             }
213              
214             sub _parse_error_from_html {
215             # This is a brittle function as it assumes certain things to be present
216             # in the HTML output and will most likely break with future updates.
217             # However the interface returns HTML in certain cases (like secure clusters)
218             # and currently that's a failure on the backend where we can;t fix things.
219             #
220             # In any case, the program should default to the original message fetched,
221             # if this fails for any reason.
222             #
223 0     0     my $self = shift;
224 0           my $errmsg = shift;
225              
226 0 0         if ( ! eval { require HTML::Parser;} ) {
  0            
227 0 0         if ( $self->{debug} ) {
228 0           printf STDERR "Tried to parse the HTML error message but HTML::Parser is not available!\n";
229             }
230 0           return;
231             }
232              
233 0           my @errors;
234 0           my $p = HTML::Parser->new(
235             api_version => 3,
236             handlers => {
237             text => [
238             \@errors,
239             'event,text',
240             ],
241             }
242             );
243 0           $p->parse( $errmsg );
244              
245             my @flat = map {;
246 0           s{ \A \s+ }{}xmsg;
247 0           s{ \s+ \z }{}xmsg;
248 0           $_;
249             }
250             grep {
251 0   0       $_ !~ m{ \Q