File Coverage

blib/lib/Net/Hadoop/YARN/ResourceManager/Scheduler/UserApps.pm
Criterion Covered Total %
statement 27 29 93.1
branch n/a
condition n/a
subroutine 10 10 100.0
pod n/a
total 37 39 94.8


line stmt bran cond sub pod time code
1             package Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps;
2             $Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps::VERSION = '0.201';
3 1     1   23322 use 5.10.0;
  1         3  
4 1     1   4 use strict;
  1         1  
  1         18  
5 1     1   3 use warnings;
  1         1  
  1         22  
6              
7 1     1   3 use Data::Dumper ();
  1         2  
  1         14  
8 1     1   463 use Moo;
  1         10402  
  1         5  
9 1     1   1436 use POSIX ();
  1         4832  
  1         21  
10 1     1   5 use Ref::Util ();
  1         1  
  1         11  
11 1     1   3 use Scalar::Util ();
  1         0  
  1         10  
12 1     1   468 use Time::Duration ();
  1         1334  
  1         19  
13 1     1   409 use Net::Hadoop::YARN::ResourceManager;
  0            
  0            
14              
15             has rm_object => (
16             is => 'rw',
17             isa => sub {
18             my $thing = shift;
19             my $type = 'Net::Hadoop::YARN::ResourceManager';
20             if ( ! $thing
21             || ! Scalar::Util::blessed $thing
22             || ! $thing->isa( $type )
23             ) {
24             die "rm_object is not a $type";
25             }
26             },
27             default => sub {
28             Net::Hadoop::YARN::ResourceManager->new(
29             ( $ENV{YARN_RESOURCE_MANAGER} ? (
30             servers => [ split /,/, $ENV{YARN_RESOURCE_MANAGER} ]
31             ) : () )
32             );
33             },
34             );
35              
36             sub collect {
37             my $self = shift;
38             my $user = shift || die "No user name was specified";
39              
40             my $apps = $self->rm_object->apps( user => $user );
41              
42             if ( ! Ref::Util::is_arrayref $apps ) {
43             if ( Ref::Util::is_hashref $apps ) {
44             if ( my $check = $apps->{apps} ) {
45             if ( ! keys %{ $check }) {
46             $apps = [];
47             }
48             else {
49             die sprintf "[TODO-1] Don't know what to do with %s",
50             Data::Dumper::Dumper [ $user => $apps ],
51             ;
52             }
53             }
54             }
55             else {
56             die sprintf "[TODO-2] Don't know what to do with %s",
57             Data::Dumper::Dumper [ $user => $apps ],
58             ;
59             }
60             }
61              
62             my $format_epoch = sub {
63             my $epoch = shift || die "No epoch specified!";
64             return POSIX::strftime "%a %b %d %Y %H:%M:%S %Z", localtime $epoch;
65             };
66              
67             my %apps_by_state;
68              
69             foreach my $app ( @{ $apps } ) {
70             foreach my $resource ( qw(
71             allocatedMB
72             allocatedVCores
73             ) ) {
74             $app->{ $resource } = 0 if $app->{ $resource } eq '-1';
75             }
76              
77             if ( $app->{allocatedMB} ) {
78             $app->{allocatedMB_fmt} = $self->format_bytes( $app->{allocatedMB} * 1024**2 );
79             }
80              
81             if ( $app->{allocatedVCores} ) {
82             $app->{allocatedVCores_fmt} = sprintf '%s vCore%s',
83             $app->{allocatedVCores},
84             $app->{allocatedVCores} > 1 ? 's' : '',
85             ;
86             }
87              
88             # TODO
89             # [STRING]"applicationTags"
90             # the value is something like "oozie-59a27f107d250c9822fd45e87fd40db8"
91             # which is not the job id.
92              
93             foreach my $hash_or_string ( qw(
94             diagnostics
95             applicationTags
96             )) {
97             next if ! exists $app->{ $hash_or_string };
98             # This is a bug in the REST layer
99             if ( Ref::Util::is_hashref $app->{ $hash_or_string }
100             && ! keys %{ $app->{ $hash_or_string } }
101             ) {
102             $app->{ $hash_or_string } = '';
103             }
104             }
105              
106             # https://www.cloudera.com/documentation/enterprise/latest/topics/cm_dg_yarn_applications.html
107             foreach my $duration_field ( qw(
108             vcoreSeconds
109             elapsedTime
110             memorySeconds
111             ) ) {
112             next if ! exists $app->{ $duration_field };
113             if ( $app->{ $duration_field } ) {
114             $app->{ $duration_field . '_fmt' } = Time::Duration::duration(
115             $duration_field eq 'elapsedTime'
116             ? $app->{ $duration_field } / 1000
117             : $app->{ $duration_field }
118             );
119             }
120             }
121              
122             foreach my $time_field ( qw(
123             finishedTime
124             startedTime
125             ) ) {
126             next if ! exists $app->{ $time_field };
127             if ( $app->{ $time_field } ) {
128             $app->{ $time_field . '_fmt' } = $format_epoch->( $app->{ $time_field } / 1000);
129             }
130             }
131              
132             if ( $app->{name} =~ m{ \Q-oozie-oozi-W\E \z }xms ) {
133             my %name = map { @{ $_ } > 1 ? @{ $_ } : ( $_->[0] => 1 ) }
134             map { [ split m{ [=] }xms, $_, 2 ] }
135             split m{ [:] }xms, $app->{name};
136             $name{workflow_name} = delete $name{W} if $name{W};
137             $name{action_name} = delete $name{A} if $name{A};
138             $name{action_type} = delete $name{T} if $name{T};
139             $name{id} = delete $name{ID} if $name{ID};
140             $app->{oozie_meta} = \%name;
141             $app->{oozie_id} = $name{id} if $name{id};
142             }
143              
144             push @{ $apps_by_state{ $app->{state} } ||= [] }, $app;
145             }
146              
147             my %total_res;
148             foreach my $app ( @{ $apps_by_state{RUNNING} }) {
149             $total_res{allocatedMB} += $app->{allocatedMB};
150             $total_res{allocatedVCores} += $app->{allocatedVCores};
151             }
152              
153             if ( $total_res{allocatedMB} ) {
154             $total_res{allocatedMB_fmt} = $self->format_bytes( $total_res{allocatedMB} * 1024**2 );
155             }
156              
157             if ( $total_res{allocatedVCores} ) {
158             $total_res{allocatedVCores_fmt} = sprintf '%s vCore%s',
159             $total_res{allocatedVCores},
160             $total_res{allocatedVCores} > 1 ? 's' : '',
161             ;
162             }
163              
164             my @grouped;
165             foreach my $ordered_state (qw(
166             RUNNING
167             ACCEPTED
168             FINISHED
169             KILLED
170             FAILED
171             )) {
172             push @grouped, {
173             state => lc( $ordered_state ),
174             state_fmt => ucfirst( lc $ordered_state ),
175             apps => delete( $apps_by_state{$ordered_state} ) || [],
176             },
177             }
178              
179             # TODO: possibly needs to be removed if we are sure that the code above
180             # is handling all of the possible states. So, this is a "just in case" part
181             #
182             push @grouped, {
183             state => 'rest',
184             apps => [ map { @{ $_ } } values %apps_by_state ],
185             };
186              
187             # Spark jobs are returned like this for whatever reason.
188             if ( my $apps = $grouped[-1]->{apps} ) {
189             if ( Ref::Util::is_arrayref $apps && Ref::Util::is_arrayref $apps->[0] ) {
190             $grouped[-1]->{apps} = [ @{ $apps->[0] } ];
191             }
192             }
193              
194             return {
195             grouped_apps => [ grep { @{ $_->{apps} } > 0 } @grouped ],
196             total_apps => scalar @{ $apps },
197             resources => \%total_res,
198             user => $user,
199             };
200             }
201              
202             sub format_bytes {
203             my $self = shift;
204             my $bytes = shift;
205             return sprintf '%.2f GB', $bytes / 1024**3;
206             }
207              
208             1;
209              
210             __END__