File Coverage

blib/lib/Net/Hadoop/YARN/Roles/Common.pm
Criterion Covered Total %
statement 36 38 94.7
branch n/a
condition n/a
subroutine 13 13 100.0
pod n/a
total 49 51 96.0


line stmt bran cond sub pod time code
1             package Net::Hadoop::YARN::Roles::Common;
2             $Net::Hadoop::YARN::Roles::Common::VERSION = '0.202';
3 5     5   36766 use strict;
  5         12  
  5         178  
4 5     5   23 use warnings;
  5         16  
  5         161  
5 5     5   76 use 5.10.0;
  5         17  
6              
7 5     5   32 use Moo::Role;
  5         15  
  5         40  
8              
9 5     5   1762 use Data::Dumper;
  5         10  
  5         393  
10 5     5   2850 use HTTP::Request;
  5         121932  
  5         190  
11 5     5   4717 use JSON::XS;
  5         30879  
  5         419  
12 5     5   3982 use LWP::UserAgent;
  5         118782  
  5         223  
13 5     5   3557 use Regexp::Common qw( net );
  5         14067  
  5         26  
14 5     5   15713 use Scalar::Util qw( blessed );
  5         10  
  5         406  
15 5     5   3774 use Socket;
  5         20411  
  5         3179  
16 5     5   55 use URI;
  5         11  
  5         179  
17 5     5   3378 use XML::LibXML::Simple;
  0            
  0            
18              
19             has no_http_redirect => (
20             is => 'rw',
21             default => sub { 0 },
22             lazy => 1,
23             );
24              
25             has _json => (
26             is => 'rw',
27             lazy => 1,
28             default => sub {
29             return JSON::XS->new->pretty(1)->canonical(1);
30             },
31             isa => sub {
32             my $json = shift;
33             if ( ! blessed $json
34             || ! $json->isa('JSON::XS')
35             || ! $json->can('decode')
36             ) {
37             die "Not a JSON object"
38             }
39             },
40             );
41              
42             has debug => (
43             is => 'rw',
44             default => sub { $ENV{NET_HADOOP_YARN_DEBUG} || 0 },
45             isa => sub { die 'debug should be an integer' if $_[0] !~ /^[0-9]$/ },
46             lazy => 1,
47             );
48              
49             has ua => (
50             is => 'rw',
51             default => sub {
52             return LWP::UserAgent->new(
53             env_proxy => 0,
54             timeout => $_[0]->timeout,
55             ( $_[0]->no_http_redirect ? (
56             max_redirect => 0,
57             ):()),
58             );
59             },
60             isa => sub {
61             my $ua = shift;
62             if ( ! blessed( $ua ) || ! $ua->isa("LWP::UserAgent") ) {
63             die "'ua' isn't a LWP::UserAgent";
64             }
65             },
66             lazy => 1,
67             );
68              
69             has timeout => (
70             is => 'rw',
71             default => sub {30},
72             lazy => 1,
73             isa => sub {
74             if ( $_[0] !~ /^[0-9]+$/ || $_[0] <= 0 ) {
75             die "timeout must be an integer"
76             }
77             },
78             );
79              
80             has servers => (
81             is => 'rw',
82             isa => sub {
83             die "Incorrect server list" if ! _check_servers(@_);
84             },
85             lazy => 1,
86             );
87              
88             has add_host_key => (
89             is => 'rw',
90             default => sub { 0 },
91             lazy => 1,
92             );
93              
94             has host_key => (
95             is => 'rw',
96             default => sub { '__RESTHost' },
97             lazy => 1,
98             );
99              
100             sub _check_host {
101             my $host = shift;
102             return !!( eval { inet_aton($host) }
103             || $host =~ $RE{net}{IPv4}
104             || $host =~ $RE{net}{IPv6} );
105             }
106              
107             sub _check_servers {
108             for my $server (@{+shift}) {
109             my ($host, $port) = split /:/, $server, 2;
110             if ( ! _check_host($host)
111             || $port !~ /^[0-9]+$/
112             || $port < 1
113             || $port > 19888
114             ) {
115             die "server $server bad host (port=$port)";
116             }
117             }
118             return 1;
119             }
120              
121             sub _mk_uri {
122             my $self = shift;
123             my ( $server, $path, $params ) = @_;
124             my $uri = $server . "/ws/v1/" . $path;
125             $uri =~ s#//+#/#g;
126             $uri = URI->new("http://" . $uri);
127             if ( $params ) {
128             $uri->query_form($params);
129             }
130             return $uri;
131             }
132              
133             # http://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html
134              
135             sub _get {
136             shift->_request( 'GET', @_ );
137             }
138              
139             sub _put {
140             shift->_request( 'PUT', @_ );
141             }
142              
143             sub _post {
144             shift->_request( 'POST', @_ );
145             }
146              
147             sub _request {
148             my $self = shift;
149             my ( $method, $path, $extra, $server ) = @_;
150              
151             my $host_key = $self->host_key;
152             my @servers = $server ? ( $server ) : @{ $self->servers };
153             my $maxtries = @servers;
154              
155             my ($eval_error, $ret, $n);
156              
157             # get a copy, don't mess with the global setting
158             #
159             my @banned_servers;
160             my $selected_server;
161              
162             TRY: for ( 1 .. $maxtries ) {
163             my $redo;
164              
165             $n++;
166              
167             if ( ! @servers ) {
168             $eval_error = sprintf "No servers left in the queue. Banned servers: '%s'",
169             @banned_servers
170             ? join( q{', '}, @banned_servers)
171             : '[none]',
172             ;
173             last TRY;
174             }
175              
176             $selected_server = $servers[0];
177             eval {
178             $eval_error = undef;
179              
180             my $uri = $self->_mk_uri(
181             $selected_server,
182             $path,
183             $method eq 'GET' ? $extra->{params} : (),
184             );
185              
186             print STDERR "====> $uri\n" if $self->debug;
187              
188             my $req = HTTP::Request->new( uc($method), $uri );
189             $req->header( "Accept-Encoding", "gzip" );
190             #$req->header( "Accept", "application/json" );
191             $req->header( "Accept", "application/xml" );
192              
193             my $response = $self->ua->request($req);
194              
195             if ( $response->code == 500 ) {
196             die "Bad request: $uri";
197             }
198              
199             # found out the json support is buggy at least in the scheduler
200             # info (overwrites child queues instead of making a list), revert
201             # to XML (see YARN-2336)
202              
203             my $res;
204             eval {
205             my $content = $response->decoded_content
206             || die 'No response from the server!';
207              
208             if ( $content !~ m{ \A ( \s+ )? <[?]xml }xms ) {
209             if ( $content =~ m{
210             \QThis is standby RM. Redirecting to the current active RM\E
211             }xms ) {
212             push @banned_servers, shift @servers;
213             $redo++;
214             die "Hit the standby with $selected_server";
215             }
216             die "Response doesn't look like XML: $content";
217             }
218              
219             $res = XMLin(
220             $content,
221             KeepRoot => 0,
222             KeyAttr => [],
223             ForceArray => [qw(
224             app
225             appAttempt
226             container
227             counterGroup
228             job
229             jobAttempt
230             task
231             taskAttempt
232             )],
233             ) || die "Failed to parse XML!";
234             1;
235             } or do {
236             # $self->_json->decode($content)
237             my $decode_error = $@ || 'Zombie error';
238              
239             # when redirected to the history server, a bug present in hadoop 2.5.1
240             # sends to an HTML page, ignoring the Accept-Type header
241             my $msg = $response->redirects
242             ? q{server response wasn't valid (possibly buggy redirect to HTML instead of JSON or XML)}
243             : q{server response wasn't valid JSON or XML}
244             ;
245              
246             die "$msg - $uri ($n/$maxtries): $decode_error";
247             };
248              
249             print STDERR Dumper $res if $self->debug;
250              
251             if ( $response->is_success ) {
252             $ret = $res;
253             return 1;
254             }
255              
256             my $e = $res->{RemoteException};
257              
258             die sprintf "%s (%s in %s) for URI: %s",
259             $e->{message} || $res->{message} || '[unknown message]',
260             $e->{exception} || $res->{exception} || '[unknown exception]',
261             $e->{javaClassName} || $res->{javaClassName} || '[unknown javaClassName]',
262             $uri,
263             ;
264              
265             1;
266             } or do {
267             # store the error for later; will be displayed if this is the last
268             # iteration. also use the next server in the list in case of retry,
269             # or reset the list for the next call (we went a full circle)
270             $eval_error = $@ || 'Zombie error';
271             redo TRY if $redo;
272             push @servers, shift @servers if @servers > 1;
273             };
274              
275             if ( $ret ) {
276             if ( $self->add_host_key ) {
277             # mark where we've been
278             $ret->{ $host_key } = $selected_server;
279             }
280             last TRY;
281             }
282              
283             } # retry as many times as there are servers
284              
285             if ( $eval_error ) {
286             die "Final error ($n/$maxtries): $eval_error";
287             }
288              
289             return $ret;
290             }
291              
292             1;
293              
294             __END__