File Coverage

blib/lib/Net/Hadoop/YARN/Roles/Common.pm
Criterion Covered Total %
statement 47 139 33.8
branch 0 52 0.0
condition 0 31 0.0
subroutine 16 23 69.5
pod n/a
total 63 245 25.7


line stmt bran cond sub pod time code
1             package Net::Hadoop::YARN::Roles::Common;
2             $Net::Hadoop::YARN::Roles::Common::VERSION = '0.203';
3 5     5   24035 use strict;
  5         10  
  5         122  
4 5     5   21 use warnings;
  5         7  
  5         99  
5 5     5   45 use 5.10.0;
  5         15  
6              
7 5     5   20 use Moo::Role;
  5         7  
  5         23  
8              
9 5     5   1525 use Data::Dumper;
  5         20  
  5         222  
10 5     5   1883 use HTTP::Request;
  5         88453  
  5         129  
11 5     5   2743 use JSON::XS;
  5         20291  
  5         237  
12 5     5   1492 use HTML::PullParser;
  5         22110  
  5         148  
13 5     5   2761 use LWP::UserAgent;
  5         99495  
  5         160  
14 5     5   2111 use Regexp::Common qw( net );
  5         10581  
  5         19  
15 5     5   12359 use Scalar::Util qw( blessed );
  5         10  
  5         209  
16 5     5   2242 use Socket;
  5         15624  
  5         1814  
17 5     5   29 use Carp;
  5         10  
  5         205  
18 5     5   1680 use Text::Trim qw( trim );
  5         2051  
  5         222  
19 5     5   34 use URI;
  5         11  
  5         96  
20 5     5   1963 use XML::LibXML::Simple;
  5         211654  
  5         7320  
21              
22             has no_http_redirect => (
23             is => 'rw',
24             default => sub { 0 },
25             lazy => 1,
26             );
27              
28             has _json => (
29             is => 'rw',
30             lazy => 1,
31             default => sub {
32             return JSON::XS->new->pretty(1)->canonical(1);
33             },
34             isa => sub {
35             my $json = shift;
36             if ( ! blessed $json
37             || ! $json->isa('JSON::XS')
38             || ! $json->can('decode')
39             ) {
40             die "Not a JSON object"
41             }
42             },
43             );
44              
45             has debug => (
46             is => 'rw',
47             default => sub { $ENV{NET_HADOOP_YARN_DEBUG} || 0 },
48             isa => sub { die 'debug should be an integer' if $_[0] !~ /^[0-9]$/ },
49             lazy => 1,
50             );
51              
52             has ua => (
53             is => 'rw',
54             default => sub {
55             return LWP::UserAgent->new(
56             env_proxy => 0,
57             timeout => $_[0]->timeout,
58             ( $_[0]->no_http_redirect ? (
59             max_redirect => 0,
60             ):()),
61             );
62             },
63             isa => sub {
64             my $ua = shift;
65             if ( ! blessed( $ua ) || ! $ua->isa("LWP::UserAgent") ) {
66             die "'ua' isn't a LWP::UserAgent";
67             }
68             },
69             lazy => 1,
70             );
71              
72             has timeout => (
73             is => 'rw',
74             default => sub {30},
75             lazy => 1,
76             isa => sub {
77             if ( $_[0] !~ /^[0-9]+$/ || $_[0] <= 0 ) {
78             die "timeout must be an integer"
79             }
80             },
81             );
82              
83             has servers => (
84             is => 'rw',
85             isa => sub {
86             die "Incorrect server list" if ! _check_servers(@_);
87             },
88             lazy => 1,
89             );
90              
91             has add_host_key => (
92             is => 'rw',
93             default => sub { 0 },
94             lazy => 1,
95             );
96              
97             has host_key => (
98             is => 'rw',
99             default => sub { '__RESTHost' },
100             lazy => 1,
101             );
102              
103             sub _check_host {
104 0     0     my $host = shift;
105             return !!( eval { inet_aton($host) }
106             || $host =~ $RE{net}{IPv4}
107 0   0       || $host =~ $RE{net}{IPv6} );
108             }
109              
110             sub _check_servers {
111 0     0     for my $server (@{+shift}) {
  0            
112 0           my ($host, $port) = split /:/, $server, 2;
113 0 0 0       if ( ! _check_host($host)
      0        
      0        
114             || $port !~ /^[0-9]+$/
115             || $port < 1
116             || $port > 19888
117             ) {
118 0           die "server $server bad host (port=$port)";
119             }
120             }
121 0           return 1;
122             }
123              
124             sub _mk_uri {
125 0     0     my $self = shift;
126 0           my ( $server, $path, $params ) = @_;
127 0           my $uri = $server . "/ws/v1/" . $path;
128 0           $uri =~ s#//+#/#g;
129 0           $uri = URI->new("http://" . $uri);
130 0 0         if ( $params ) {
131 0           $uri->query_form($params);
132             }
133 0           return $uri;
134             }
135              
136             # http://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html
137              
138             sub _get {
139 0     0     shift->_request( 'GET', @_ );
140             }
141              
142             sub _put {
143 0     0     shift->_request( 'PUT', @_ );
144             }
145              
146             sub _post {
147 0     0     shift->_request( 'POST', @_ );
148             }
149              
150             sub _request {
151 0     0     my $self = shift;
152 0           my ( $method, $path, $extra, $server ) = @_;
153              
154 0           my $host_key = $self->host_key;
155 0 0         my @servers = $server ? ( $server ) : @{ $self->servers };
  0            
156 0           my $maxtries = @servers;
157              
158 0           my ($eval_error, $ret);
159 0           my $n = 0;
160              
161             # get a copy, don't mess with the global setting
162             #
163 0           my @banned_servers;
164             my $selected_server;
165              
166 0           my $e_non_html = "Response doesn't look like XML: ";
167              
168 0           TRY: while ( $n < $maxtries ) {
169 0           my $redo;
170              
171 0           $n++;
172              
173 0 0         if ( ! @servers ) {
174 0 0         $eval_error = sprintf "No servers left in the queue. Banned servers: '%s'",
175             @banned_servers
176             ? join( q{', '}, @banned_servers)
177             : '[none]',
178             ;
179 0           last TRY;
180             }
181              
182 0           $selected_server = $servers[0];
183             eval {
184 0           $eval_error = undef;
185              
186             my $uri = $self->_mk_uri(
187             $selected_server,
188             $path,
189 0 0         $method eq 'GET' ? $extra->{params} : (),
190             );
191              
192 0 0         print STDERR "====> $uri\n" if $self->debug;
193              
194 0           my $req = HTTP::Request->new( uc($method), $uri );
195 0           $req->header( "Accept-Encoding", "gzip" );
196             #$req->header( "Accept", "application/json" );
197 0           $req->header( "Accept", "application/xml" );
198              
199 0           my $response = $self->ua->request($req);
200              
201 0 0         if ( $response->code == 500 ) {
    0          
202 0           die "Bad request: $uri";
203             } elsif ( $response->code == 401 ) {
204             my $extramsg = ( $response->headers->{'www-authenticate'} || '' ) eq 'Negotiate'
205 0 0 0       ? eval { require LWP::Authen::Negotiate; 1; }
  0 0          
  0            
206             ? q{ (Did you forget to run kinit?) }
207             : q{ (LWP::Authen::Negotiate doesn't seem available) }
208             : '';
209 0           croak "SecurityError$extramsg";
210             }
211              
212             # found out the json support is buggy at least in the scheduler
213             # info (overwrites child queues instead of making a list), revert
214             # to XML (see YARN-2336)
215              
216 0           my $res;
217             eval {
218 0   0       my $content = $response->decoded_content
219             || die 'No response from the server!';
220              
221 0 0         if ( $content !~ m{ \A ( \s+ )? <[?]xml }xms ) {
222 0 0         if ( $content =~ m{
223             \QThis is standby RM. Redirecting to the current active RM\E
224             }xms ) {
225 0           push @banned_servers, shift @servers;
226 0           $redo++;
227 0           die "Hit the standby with $selected_server";
228             }
229 0           die $e_non_html . $content;
230             }
231              
232 0   0       $res = XMLin(
233             $content,
234             KeepRoot => 0,
235             KeyAttr => [],
236             ForceArray => [qw(
237             app
238             appAttempt
239             container
240             counterGroup
241             job
242             jobAttempt
243             task
244             taskAttempt
245             )],
246             ) || die "Failed to parse XML!";
247 0           1;
248 0 0         } or do {
249 0           my $is_html = $response->content_type eq 'text/html';
250 0   0       my $decode_error = $@ || 'Zombie error';
251              
252 0 0         if ( $is_html ) {
253 0           (my $str_to_parse = $decode_error) =~ s{ \Q$e_non_html\E }{}xms;
254 0   0       my $parser = HTML::PullParser->new(
255             doc => \$str_to_parse,
256             text => 'dtext',
257             ) || Carp::confess "Can't parse HTML received from the API: $!";
258 0           my %link;
259             my @txt_error;
260 0           while ( my $token = $parser->get_token ) {
261 0 0         my $txt = trim $token->[0] or next;
262 0           push @txt_error, $txt;
263             }
264 0           $decode_error = 'Decoded error: ' . join q{ }, @txt_error;
265             };
266              
267 0           my $will_fail_again = $decode_error =~ m{
268             \Qcould not be found, please try the history server\E
269             }xms;
270              
271 0 0         $n = $maxtries if $will_fail_again;
272              
273             # when redirected to the history server, a bug present in hadoop 2.5.1
274             # sends to an HTML page, ignoring the Accept-Type header
275 0 0         my $msg = $response->redirects
276             ? q{server response wasn't valid (possibly buggy redirect to HTML instead of JSON or XML)}
277             : q{server response wasn't valid JSON or XML}
278             ;
279              
280 0           die "$msg - $uri ($n/$maxtries): $decode_error";
281             };
282              
283 0 0         print STDERR Dumper $res if $self->debug;
284              
285 0 0         if ( $response->is_success ) {
286 0           $ret = $res;
287 0           return 1;
288             }
289              
290 0           my $e = $res->{RemoteException};
291              
292             die sprintf "%s (%s in %s) for URI: %s",
293             $e->{message} || $res->{message} || '[unknown message]',
294             $e->{exception} || $res->{exception} || '[unknown exception]',
295 0   0       $e->{javaClassName} || $res->{javaClassName} || '[unknown javaClassName]',
      0        
      0        
296             $uri,
297             ;
298              
299 0           1;
300 0 0         } or do {
301             # store the error for later; will be displayed if this is the last
302             # iteration. also use the next server in the list in case of retry,
303             # or reset the list for the next call (we went a full circle)
304 0   0       $eval_error = $@ || 'Zombie error';
305 0 0         redo TRY if $redo;
306 0 0         push @servers, shift @servers if @servers > 1;
307             };
308              
309 0 0         if ( $ret ) {
310 0 0         if ( $self->add_host_key ) {
311             # mark where we've been
312 0           $ret->{ $host_key } = $selected_server;
313             }
314 0           last TRY;
315             }
316              
317             } # retry as many times as there are servers
318              
319 0 0         if ( $eval_error ) {
320 0           die "Final error ($n/$maxtries): $eval_error";
321             }
322              
323 0           return $ret;
324             }
325              
326             1;
327              
328             __END__