File Coverage

blib/lib/Net/Hadoop/YARN/ResourceManager.pm
Criterion Covered Total %
statement 20 92 21.7
branch 0 44 0.0
condition 0 35 0.0
subroutine 7 16 43.7
pod 8 8 100.0
total 35 195 17.9


line stmt bran cond sub pod time code
1             package Net::Hadoop::YARN::ResourceManager;
2             $Net::Hadoop::YARN::ResourceManager::VERSION = '0.203';
3 4     4   88833 use strict;
  4         17  
  4         98  
4 4     4   16 use warnings;
  4         7  
  4         75  
5 4     4   39 use 5.10.0;
  4         11  
6              
7 4     4   20 use Data::Dumper;
  4         5  
  4         151  
8 4     4   476 use Moo;
  4         9227  
  4         38  
9 4         252 use Ref::Util qw(
10             is_ref
11             is_arrayref
12             is_hashref
13 4     4   2822 );
  4         2570  
14 4         3671 use Scalar::Util qw(
15             refaddr
16 4     4   24 );
  4         6  
17              
18             with 'Net::Hadoop::YARN::Roles::Common';
19              
20             has '+servers' => (
21             default => sub { ["localhost:8088"] },
22             );
23              
24             has '+add_host_key' => ( default => sub { 1 } );
25              
26             # After CDH 5.8.2, The RM properly issues a 302 redirect, but losing any query
27             # string in the original request, thus making the filters and any other parameter
28             # a no-op and returning huge responses including "everything" instead of a subset.
29             #
30             # eg: { queue: "root.whatever" } filter will be lost after the redirect.
31             #
32             # We can't use active_rm() in here or in the Common role, as it will trigger a
33             # deep recursion due to the fact that the HTTP calls are shared from there.
34             #
35             # Possibly needs to be revisited again in the future.
36             #
37             has '+no_http_redirect' => ( default => sub { 1 } );
38              
39             sub active_rm {
40 0     0 1   my $self = shift;
41 0 0         my $opt = is_hashref $_[0] ? shift @_ : {};
42 0           my $rv;
43              
44 0           foreach my $server ( @{ $self->servers } ) {
  0            
45 0           my $info = $self->info({ server => $server });
46 0   0       my $haState = $info->{haState} || next;
47 0 0         if ( $haState eq 'ACTIVE' ) {
48 0           $rv = $server;
49 0           last;
50             }
51             }
52              
53 0 0         if ( ! $rv ) {
54             die sprintf "Failed to locate the active YARN Resource Manager from these hosts: %s",
55 0           join( q{, }, @{ $self->servers } ),
  0            
56             ;
57             }
58              
59 0 0         if ( $opt->{hostname_only} ) {
60 0           return +( split m{[:]}xms, $rv )[0];
61             }
62              
63 0           return $rv;
64             }
65              
66             sub info {
67 0     0 1   my $self = shift;
68 0 0         my $opt = is_hashref $_[0] ? shift @_ : {};
69              
70             my $res = $self->_get(
71             "cluster/info",
72             undef,
73 0   0       ( $opt->{server} or () ),
74             );
75              
76             return $self->_apply_host_key(
77             $res,
78 0   0       $res->{clusterInfo} || $res,
79             );
80             }
81              
82             sub metrics {
83 0     0 1   my $self = shift;
84 0 0         my $opt = is_hashref $_[0] ? shift @_ : {};
85             my $res = $self->_get(
86             "cluster/metrics",
87             undef,
88 0   0       ( $opt->{server} or () ),
89             );
90              
91             return $self->_apply_host_key(
92             $res,
93 0   0       $res->{clusterMetrics} || $res,
94             );
95             }
96              
97             sub scheduler {
98 0     0 1   my $self = shift;
99 0           my $res = $self->_get("cluster/scheduler");
100             return $self->_apply_host_key(
101             $res,
102 0   0       $res->{schedulerInfo} || $res,
103             );
104             }
105              
106             sub apps {
107 0     0 1   my $self = shift;
108 0           my $app_id;
109             my $options;
110 0 0         if ( @_ == 1 ) {
    0          
111 0 0         if ( !ref $_[0] ) {
112 0           $app_id = shift;
113             }
114             else {
115 0           $options = shift;
116             }
117             }
118             elsif ( @_ > 1 ) {
119 0           $options = {@_};
120             }
121 0 0         my $res = $self->_get(
122             $app_id ? "cluster/apps/$app_id" : ( "cluster/apps", { params => $options } )
123             );
124              
125             return $self->_apply_host_key(
126             $res,
127 0   0       $res->{apps}{app} || $res->{app} || $res,
128             );
129             }
130              
131             sub appattempts {
132 0     0 1   my $self = shift;
133 0 0         my $app_id = shift or die "No app ID provided";
134 0           my $res = $self->_get( "cluster/apps/$app_id/appattempts" );
135 0           return $res;
136             }
137              
138             # TODO check all states and add filter (validation)
139              
140             sub appstatistics {
141 0     0 1   my $self = shift;
142 0           my $options;
143 0 0 0       if ( @_ == 1 && ref $_[0] ) {
    0          
144 0           $options = shift;
145             }
146             elsif ( @_ > 1 ) {
147 0           $options = {@_};
148             }
149 0 0         my $res = $self->_get(
150             "cluster/appstatistics",
151             ( $options ? {
152             params => $options,
153             } : () ),
154             );
155              
156 0 0         if ($res) {
157             return $self->_apply_host_key(
158             $res,
159             $res->{appStatInfo}{statItem} || $res->{statItem},
160 0   0       );
161             }
162              
163 0           return;
164             }
165              
166             sub nodes {
167 0     0 1   my $self = shift;
168 0           my $node_id;
169             my $options;
170 0 0         if ( @_ == 1 ) {
    0          
171 0 0         if ( !ref $_[0] ) {
172 0           $node_id = shift;
173             }
174             else {
175 0           $options = shift;
176             }
177             }
178             elsif ( @_ > 1 ) {
179 0           $options = {@_};
180             }
181 0 0         my $res = $self->_get(
182             $node_id ? "cluster/nodes/$node_id"
183             : (
184             "cluster/nodes",
185             { params => $options },
186             )
187             );
188              
189              
190             return $self->_apply_host_key(
191             $res,
192 0   0       $res->{nodes}{node} || $res->{node} || $res,
193             );
194              
195             }
196              
197             sub _apply_host_key {
198 0     0     my $self = shift;
199 0           my $res = shift;
200 0           my $rv = shift;
201              
202              
203 0 0 0       if ( is_ref( $res )
      0        
204             && is_ref( $rv )
205             && ( refaddr $res eq refaddr $rv )
206             ) {
207 0           return $rv;
208             }
209              
210 0           my $host_key = $self->host_key;
211 0           my $this_host = $res->{ $host_key };
212              
213 0 0         if ( is_hashref $rv ) {
    0          
214 0           $rv->{ $host_key } = $this_host;
215             }
216             elsif ( is_arrayref $rv ) {
217 0           foreach my $e ( @{ $rv } ) {
  0            
218 0           $e->{ $host_key } = $this_host;
219             }
220             }
221             else {
222 0           die "Got unknown data: $rv";
223             }
224              
225 0           return $rv;
226             }
227              
228             1;
229              
230             __END__