File Coverage

blib/lib/Net/Hadoop/YARN/ResourceManager/Scheduler/UserApps.pm
Criterion Covered Total %
statement 29 105 27.6
branch 0 54 0.0
condition 0 14 0.0
subroutine 10 13 76.9
pod 2 2 100.0
total 41 188 21.8


line stmt bran cond sub pod time code
1             package Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps;
2             $Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps::VERSION = '0.203';
3 1     1   86666 use 5.10.0;
  1         10  
4 1     1   5 use strict;
  1         2  
  1         16  
5 1     1   4 use warnings;
  1         2  
  1         27  
6              
7 1     1   5 use Data::Dumper ();
  1         2  
  1         12  
8 1     1   417 use Moo;
  1         9176  
  1         4  
9 1     1   1187 use POSIX ();
  1         2  
  1         11  
10 1     1   4 use Ref::Util ();
  1         1  
  1         10  
11 1     1   4 use Scalar::Util ();
  1         1  
  1         10  
12 1     1   388 use Time::Duration ();
  1         1486  
  1         21  
13 1     1   352 use Net::Hadoop::YARN::ResourceManager;
  1         4  
  1         923  
14              
15             has rm_object => (
16             is => 'rw',
17             isa => sub {
18             my $thing = shift;
19             my $type = 'Net::Hadoop::YARN::ResourceManager';
20             if ( ! $thing
21             || ! Scalar::Util::blessed $thing
22             || ! $thing->isa( $type )
23             ) {
24             die "rm_object is not a $type";
25             }
26             },
27             default => sub {
28             Net::Hadoop::YARN::ResourceManager->new(
29             ( $ENV{YARN_RESOURCE_MANAGER} ? (
30             servers => [ split /,/, $ENV{YARN_RESOURCE_MANAGER} ]
31             ) : () )
32             );
33             },
34             );
35              
36             sub collect {
37 0     0 1   my $self = shift;
38 0   0       my $user = shift || die "No user name was specified";
39              
40 0           my $apps = $self->rm_object->apps( user => $user );
41              
42 0 0         if ( ! Ref::Util::is_arrayref $apps ) {
43 0 0         if ( Ref::Util::is_hashref $apps ) {
44 0 0         if ( my $check = $apps->{apps} ) {
45 0 0         if ( ! keys %{ $check }) {
  0            
46 0           $apps = [];
47             }
48             else {
49 0           die sprintf "[TODO-1] Don't know what to do with %s",
50             Data::Dumper::Dumper [ $user => $apps ],
51             ;
52             }
53             }
54             }
55             else {
56 0           die sprintf "[TODO-2] Don't know what to do with %s",
57             Data::Dumper::Dumper [ $user => $apps ],
58             ;
59             }
60             }
61              
62             my $format_epoch = sub {
63 0   0 0     my $epoch = shift || die "No epoch specified!";
64 0           return POSIX::strftime "%a %b %d %Y %H:%M:%S %Z", localtime $epoch;
65 0           };
66              
67 0           my %apps_by_state;
68              
69 0           foreach my $app ( @{ $apps } ) {
  0            
70 0           foreach my $resource ( qw(
71             allocatedMB
72             allocatedVCores
73             ) ) {
74 0 0         $app->{ $resource } = 0 if $app->{ $resource } eq '-1';
75             }
76              
77 0 0         if ( $app->{allocatedMB} ) {
78 0           $app->{allocatedMB_fmt} = $self->format_bytes( $app->{allocatedMB} * 1024**2 );
79             }
80              
81 0 0         if ( $app->{allocatedVCores} ) {
82             $app->{allocatedVCores_fmt} = sprintf '%s vCore%s',
83             $app->{allocatedVCores},
84 0 0         $app->{allocatedVCores} > 1 ? 's' : '',
85             ;
86             }
87              
88             # TODO
89             # [STRING]"applicationTags"
90             # the value is something like "oozie-59a27f107d250c9822fd45e87fd40db8"
91             # which is not the job id.
92              
93 0           foreach my $hash_or_string ( qw(
94             diagnostics
95             applicationTags
96             )) {
97 0 0         next if ! exists $app->{ $hash_or_string };
98             # This is a bug in the REST layer
99 0 0 0       if ( Ref::Util::is_hashref $app->{ $hash_or_string }
100 0           && ! keys %{ $app->{ $hash_or_string } }
101             ) {
102 0           $app->{ $hash_or_string } = '';
103             }
104             }
105              
106             # https://www.cloudera.com/documentation/enterprise/latest/topics/cm_dg_yarn_applications.html
107 0           foreach my $duration_field ( qw(
108             vcoreSeconds
109             elapsedTime
110             memorySeconds
111             ) ) {
112 0 0         next if ! exists $app->{ $duration_field };
113 0 0         if ( $app->{ $duration_field } ) {
114             $app->{ $duration_field . '_fmt' } = Time::Duration::duration(
115             $duration_field eq 'elapsedTime'
116             ? $app->{ $duration_field } / 1000
117 0 0         : $app->{ $duration_field }
118             );
119             }
120             }
121              
122 0           foreach my $time_field ( qw(
123             finishedTime
124             startedTime
125             ) ) {
126 0 0         next if ! exists $app->{ $time_field };
127 0 0         if ( $app->{ $time_field } ) {
128 0           $app->{ $time_field . '_fmt' } = $format_epoch->( $app->{ $time_field } / 1000);
129             }
130             }
131              
132 0 0         if ( $app->{name} =~ m{ \Q-oozie-oozi-W\E \z }xms ) {
133 0 0         my %name = map { @{ $_ } > 1 ? @{ $_ } : ( $_->[0] => 1 ) }
  0            
  0            
134 0           map { [ split m{ [=] }xms, $_, 2 ] }
135 0           split m{ [:] }xms, $app->{name};
136 0 0         $name{workflow_name} = delete $name{W} if $name{W};
137 0 0         $name{action_name} = delete $name{A} if $name{A};
138 0 0         $name{action_type} = delete $name{T} if $name{T};
139 0 0         $name{id} = delete $name{ID} if $name{ID};
140 0           $app->{oozie_meta} = \%name;
141 0 0         $app->{oozie_id} = $name{id} if $name{id};
142             }
143              
144 0   0       push @{ $apps_by_state{ $app->{state} } ||= [] }, $app;
  0            
145             }
146              
147 0           my %total_res;
148 0           foreach my $app ( @{ $apps_by_state{RUNNING} }) {
  0            
149 0           $total_res{allocatedMB} += $app->{allocatedMB};
150 0           $total_res{allocatedVCores} += $app->{allocatedVCores};
151             }
152              
153 0 0         if ( $total_res{allocatedMB} ) {
154 0           $total_res{allocatedMB_fmt} = $self->format_bytes( $total_res{allocatedMB} * 1024**2 );
155             }
156              
157 0 0         if ( $total_res{allocatedVCores} ) {
158             $total_res{allocatedVCores_fmt} = sprintf '%s vCore%s',
159             $total_res{allocatedVCores},
160 0 0         $total_res{allocatedVCores} > 1 ? 's' : '',
161             ;
162             }
163              
164 0           my @grouped;
165 0           foreach my $ordered_state (qw(
166             RUNNING
167             ACCEPTED
168             FINISHED
169             KILLED
170             FAILED
171             )) {
172             push @grouped, {
173             state => lc( $ordered_state ),
174             state_fmt => ucfirst( lc $ordered_state ),
175 0   0       apps => delete( $apps_by_state{$ordered_state} ) || [],
176             },
177             }
178              
179             # TODO: possibly needs to be removed if we are sure that the code above
180             # is handling all of the possible states. So, this is a "just in case" part
181             #
182             push @grouped, {
183             state => 'rest',
184 0           apps => [ map { @{ $_ } } values %apps_by_state ],
  0            
  0            
185             };
186              
187             # Spark jobs are returned like this for whatever reason.
188 0 0         if ( my $apps = $grouped[-1]->{apps} ) {
189 0 0 0       if ( Ref::Util::is_arrayref $apps && Ref::Util::is_arrayref $apps->[0] ) {
190 0           $grouped[-1]->{apps} = [ @{ $apps->[0] } ];
  0            
191             }
192             }
193              
194             return {
195 0           grouped_apps => [ grep { @{ $_->{apps} } > 0 } @grouped ],
  0            
196 0           total_apps => scalar @{ $apps },
  0            
197             resources => \%total_res,
198             user => $user,
199             };
200             }
201              
202             sub format_bytes {
203 0     0 1   my $self = shift;
204 0           my $bytes = shift;
205 0           return sprintf '%.2f GB', $bytes / 1024**3;
206             }
207              
208             1;
209              
210             __END__