File Coverage

blib/lib/Hadoop/HDFS/Command.pm
Criterion Covered Total %
statement 41 220 18.6
branch 0 60 0.0
condition 0 29 0.0
subroutine 14 28 50.0
pod 1 1 100.0
total 56 338 16.5


line stmt bran cond sub pod time code
1             package Hadoop::HDFS::Command;
2             $Hadoop::HDFS::Command::VERSION = '0.005';
3 1     1   65052 use 5.010;
  1         3  
4 1     1   5 use strict;
  1         2  
  1         19  
5 1     1   4 use warnings;
  1         2  
  1         33  
6 1     1   295 use Capture::Tiny ();
  1         22854  
  1         29  
7 1     1   7 use Carp ();
  1         2  
  1         15  
8 1     1   375 use Data::Dumper;
  1         5871  
  1         77  
9 1     1   394 use DateTime::Format::Strptime;
  1         436101  
  1         6  
10 1     1   98 use DateTime;
  1         2  
  1         20  
11 1     1   510 use Getopt::Long ();
  1         7962  
  1         33  
12 1     1   550 use IPC::Cmd ();
  1         33035  
  1         26  
13 1     1   10 use Ref::Util ();
  1         1  
  1         16  
14 1     1   4 use Time::HiRes qw( time );
  1         2  
  1         8  
15 1     1   519 use Types::Standard qw(Bool Str);
  1         54384  
  1         11  
16              
17 1     1   1168 { use Moo; }
  1         5049  
  1         6  
18              
19             has cmd_hdfs => (
20             is => 'rw',
21             isa => sub {
22             my $val = shift;
23             return if $val && -e $val && -x _;
24             Carp::confess sprintf "The command `%s` either does not exist or not an executable!",
25             $val,
26             ;
27             },
28             default => sub { '/usr/bin/hdfs' },
29             lazy => 1,
30             );
31              
32             has enable_log => (
33             is => 'rw',
34             isa => Bool,
35             default => sub { 0 },
36             lazy => 1,
37             );
38              
39             has runas => (
40             is => 'rw',
41             isa => Str,
42             default => scalar getpwuid $<,
43             lazy => 1,
44             );
45              
46             before ['_capture', '_capture_with_stdin'] => sub {
47             my ($self, $options, @cmd) = @_;
48             unshift @cmd, 'sudo', '-u', $self->runas
49             unless $self->runas eq getpwuid $<;
50             @_ = ($self, $options, @cmd);
51             };
52              
53             sub dfs {
54 0     0 1   my $self = shift;
55 0 0         my $options = Ref::Util::is_hashref $_[0] ? shift( @_ ) : {};
56 0   0       (my $cmd = shift || die "No dfs command specified") =~ s{ \A [-]+ }{}xms;
57 0           my $method = '_dfs_' . $cmd;
58 0 0         Carp::croak "'$cmd' is not implemented!" if ! $self->can( $method );
59 0           $self->$method( $options, @_ );
60             }
61              
62             sub _dfs_ls {
63 0     0     my $self = shift;
64 0           state $strp;
65              
66 0           my $options = shift;
67 0           my @params = @_;
68 0           my @flags = qw( d h R );
69 0           my($arg, $paths) = $self->_parse_options(
70             \@params,
71             \@flags,
72             undef,
73             {
74             require_params => 1,
75             },
76             );
77              
78 0           my $want_epoch = $options->{want_epoch};
79 0           my $cb = delete $options->{callback};
80              
81 0 0         if ( $cb ) {
82 0 0         die "callback needs to be a CODE" if ! Ref::Util::is_coderef $cb;
83 0 0         if ( defined wantarray ) {
84 0           Carp::croak "You need to call this function in void context when callback is specified";
85             }
86             }
87              
88             my @response = $self->_capture(
89             $options,
90             $self->cmd_hdfs,
91             qw( dfs -ls ),
92 0           ( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
  0            
93 0           @{ $paths },
  0            
94             );
95              
96             # directory is empty
97             #
98 0 0         return if ! @response;
99              
100 0 0 0       if ( $response[0] && $response[0] =~ m{ \A Found \s+ [0-9] }xms ) {
101 0           shift @response; # junk
102             }
103              
104 0           my $space = q{ };
105              
106 0           my @rv;
107 0           for my $line ( @response ) {
108 0           my($mode, $replication, $user, $group, @unknown) = split m{ \s+ }xms, $line, 5;
109 0           my @rest = map { split $space, $_ } @unknown;
  0            
110 0           my $size;
111 0 0         if ( $arg->{h}) {
112 0 0 0       if ( $rest[0] eq '0' || $rest[1] !~ m{ [a-zA-Z_] }xms ) {
113 0           $size = shift @rest;
114             }
115             else {
116 0           $size = join $space, shift @rest, shift @rest;
117             }
118             }
119             else {
120 0           $size = shift @rest;
121             }
122 0           my $date = join ' ', shift @rest, shift @rest;
123 0   0       my $path = shift( @rest ) || die "Unable to parse $line to gather the path";
124 0 0         my $is_dir = $mode =~ m{ \A [d] }xms ? 1 : 0;
125              
126 0 0         my %record = (
127             mode => $mode,
128             replication => $replication,
129             user => $user,
130             group => $group,
131             size => $size,
132             date => $date,
133             path => $path,
134             type => $is_dir ? 'dir' : 'file',
135             );
136              
137 0 0         if ( $want_epoch ) {
138 0   0       $strp ||= DateTime::Format::Strptime->new(
139             pattern => '%Y-%m-%d %H:%M',
140             time_zone => 'CET',
141             on_error => 'croak',
142             );
143             eval {
144 0           $record{epoch} = $strp->parse_datetime( $date )->epoch;
145 0           1;
146 0 0         } or do {
147 0   0       my $eval_error = $@ || 'Zombie error';
148 0           $self->_log( debug => 'Failed to convert %s into an epoch: %s',
149             $date,
150             $eval_error,
151             );
152             };
153             }
154              
155 0 0         if ( @rest ) {
156             # interpret as the rest of the path as spaces in paths are ok
157             # possibly this will need to be revisited in the future.
158             #
159 0           $record{path} = join $space, $record{path}, @rest;
160             }
161              
162 0 0         if ( $cb ) {
163             # control the flow from the callback
164             # So, the return value matters.
165             #
166 0 0         if ( ! $cb->( \%record ) ) {
167 0           $self->_log( info => 'Terminating the ls processing as the user callback did not return a true value.');
168 0           last;
169             }
170 0           next;
171             }
172              
173 0           push @rv, { %record };
174             }
175              
176 0 0         return if $cb;
177 0           return @rv;
178             }
179              
180             sub _dfs_du {
181 0     0     my $self = shift;
182 0           my $options = shift;
183 0           my @params = @_;
184 0           my @flags = qw( h s );
185 0           my($arg, $paths) = $self->_parse_options(
186             \@params,
187             \@flags,
188             undef,
189             {
190             require_params => 1,
191             },
192             );
193              
194             my @rv = $self->_capture(
195             $options,
196             $self->cmd_hdfs,
197             qw( dfs -du ),
198 0           ( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
  0            
199 0 0         @{ $paths },
  0            
200             ) or die "No output collected from -du command";
201              
202             return map {
203 0           my @val = split m{ \s{2,} }xms, $_;
  0            
204             {
205 0 0         size => shift( @val ),
206             name => pop( @val ),
207             ( @val ? (
208             disk_space_consumed => shift( @val ),
209             ) : () ),
210             }
211             } @rv;
212             }
213              
214             sub _dfs_mv {
215 0     0     my $self = shift;
216 0           my $options = shift;
217 0           my @params = @_;
218 0           my($arg, $paths) = $self->_parse_options(
219             \@params,
220             [],
221             undef,
222             {
223             require_params => 1,
224             },
225             );
226 0   0       my $source = shift @{ $paths } || die "Source path not specified";
227 0   0       my $target = shift @{ $paths } || die "Target path not specified";
228              
229             # will die on error
230 0           $self->_capture(
231             $options,
232             $self->cmd_hdfs,
233             qw( dfs -mv ),
234             $source => $target,
235             );
236              
237 0           return;
238             }
239              
240             sub _dfs_rm {
241 0     0     my $self = shift;
242 0           my $options = shift;
243 0           my @params = @_;
244 0           my @flags = qw( f r skipTrash );
245 0           my($arg, $paths) = $self->_parse_options(
246             \@params,
247             \@flags,
248             undef,
249             {
250             require_params => 1,
251             },
252             );
253              
254             my @response = $self->_capture(
255             $options,
256             $self->cmd_hdfs,
257             qw( dfs -rm ),
258 0           ( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
  0            
259 0           @{ $paths },
  0            
260             );
261              
262             # just a confirmation message
263 0           return @response;
264             }
265              
266             sub _dfs_put {
267 0     0     my $self = shift;
268 0           my $options = shift;
269 0           my @params = @_;
270 0           my @flags = qw( f p l - );
271 0           my($arg, $paths) = $self->_parse_options(
272             \@params,
273             \@flags,
274             undef,
275             {
276             require_params => 1,
277             },
278             );
279              
280 0 0 0       if ( $paths->[0] && $paths->[0] eq '\-' ) {
281 0           shift @{ $paths };
  0            
282 0   0       $options->{stdin} = pop( @{ $paths } ) || die "stdin content not specified!";
283             }
284              
285 0 0         if ( @{ $paths } < ( $options->{stdin} ? 1 : 2 ) ) {
  0 0          
286 0           die "Missing arguments!";
287             }
288              
289             my @response = $self->_capture_with_stdin(
290             $options,
291             $self->cmd_hdfs,
292             qw( dfs -put ),
293 0 0         ( map { $_ eq '-' ? $_ : '-' . $_ } grep { $arg->{ $_ } } @flags ),
  0            
294             ( $options->{stdin} ? '-' : () ),
295 0 0         @{ $paths },
  0            
296             );
297              
298             # just a confirmation message
299 0           return @response;
300             }
301              
302             sub _dfs_test {
303 0     0     my $self = shift;
304 0           my $options = shift;
305 0           my @params = @_;
306 0           my @flags = qw( d e f s z );
307 0           my($arg, $paths) = $self->_parse_options(
308             \@params,
309             \@flags,
310             undef,
311             {
312             require_params => 1,
313             },
314             );
315 0 0         eval {
316             $self->_capture(
317             $options,
318             $self->cmd_hdfs,
319             qw( dfs -test ),
320 0           ( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
  0            
321 0           @{ $paths },
  0            
322             );
323 0           return 1;
324             } or return 0;
325             }
326              
327             sub _dfs_mkdir {
328 0     0     my $self = shift;
329 0           my $options = shift;
330 0           my @params = @_;
331 0           my @flags = qw( p );
332 0           my($arg, $paths) = $self->_parse_options(
333             \@params,
334             \@flags,
335             undef,
336             {
337             require_params => 1,
338             },
339             );
340             my @response = $self->_capture(
341             $options,
342             $self->cmd_hdfs,
343             qw( dfs -mkdir ),
344 0           ( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
  0            
345 0           @{ $paths },
  0            
346             );
347              
348             # just a confirmation message
349             return @response
350 0           }
351              
352             sub _dfs_chmod {
353 0     0     my $self = shift;
354 0           my $options = shift;
355 0           my @params = @_;
356 0           my @flags = qw( p );
357 0           my($arg, $paths) = $self->_parse_options(
358             \@params,
359             \@flags,
360             undef,
361             {
362             require_params => 1,
363             },
364             );
365             my @response = $self->_capture(
366             $options,
367             $self->cmd_hdfs,
368             qw( dfs -chmod ),
369 0           ( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
  0            
370 0           @{ $paths },
  0            
371             );
372              
373             # just a confirmation message
374             return @response
375 0           }
376              
377             sub _dfs_chown {
378 0     0     my $self = shift;
379 0           my $options = shift;
380 0           my @params = @_;
381 0           my @flags = qw( p );
382 0           my($arg, $paths) = $self->_parse_options(
383             \@params,
384             \@flags,
385             undef,
386             {
387             require_params => 1,
388             },
389             );
390             my @response = $self->_capture(
391             $options,
392             $self->cmd_hdfs,
393             qw( dfs -chown ),
394 0           ( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
  0            
395 0           @{ $paths },
  0            
396             );
397              
398             # just a confirmation message
399             return @response
400 0           }
401              
402             sub _dfs_get {
403 0     0     my $self = shift;
404 0           my $options = shift;
405 0           my @params = @_;
406 0           my @flags = qw( p ignoreCrc crc );
407 0           my($arg, $paths) = $self->_parse_options(
408             \@params,
409             \@flags,
410             undef,
411             {
412             require_params => 1,
413             },
414             );
415             my @response = $self->_capture(
416             $options,
417             $self->cmd_hdfs,
418             qw( dfs -get ),
419 0           ( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
  0            
420 0           @{ $paths },
  0            
421             );
422              
423             # just a confirmation message
424             return @response
425 0           }
426              
427             sub _parse_options {
428 0     0     my $self = shift;
429             # TODO: collect dfs generic options
430             #
431             # Generic options supported are
432             # -conf <configuration file> specify an application configuration file
433             # -D <property=value> use value for given property
434             # -fs <local|namenode:port> specify a namenode
435             # -jt <local|resourcemanager:port> specify a ResourceManager
436             # -files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
437             # -libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
438             # -archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
439              
440 0           my($params, $flags, $opt, $conf) = @_;
441 0   0       $conf ||= {};
442 0 0         my @params = map { $_ eq '-' ? '\-' : $_ } @{ $params };
  0            
  0            
443              
444             Getopt::Long::GetOptionsFromArray(
445             \@params,
446             \my %arg,
447             (
448 0 0         map { Ref::Util::is_arrayref $_ ? @{ $_ } : () }
  0 0          
  0            
449             $flags,
450             $opt,
451             ),
452 0           ) || die qq{Unable to parse parameters: '@{$params}'};
453              
454 0 0 0       if ( $conf->{require_params} && ! @params ) {
455 0           die "No parameters were specified!";
456             }
457              
458 0           return \%arg, [ @params ];
459             }
460              
461             sub _capture {
462             my $self = shift;
463             my $options = shift;
464             my @cmd = @_;
465              
466             $self->_log( debug => 'Executing command: %s', join(' ', @cmd) );
467              
468             my $start = time;
469              
470             my($stdout, $stderr, $fail) = Capture::Tiny::capture {
471             system( @cmd );
472             };
473              
474             $self->_log( debug => 'Execution took %.3f seconds', time - $start );
475              
476             if ( $fail ) {
477             my $code = $fail >> 8;
478             $stderr ||= '[no error]';
479             my $msg = "External command (@cmd) failed with status=$code: $stderr";
480             if ( $options->{ignore_fail} ) {
481             if ( ! $options->{silent} ) {
482             warn "[Fatal error downgraded to a warning] $msg";
483             }
484             return $self->_split_on_newlines( $stdout || '' );
485             }
486             die $msg;
487             }
488              
489             if ( $stderr ) {
490             warn "Warning from external command: $stderr";
491             }
492              
493             return $self->_split_on_newlines( $stdout );
494             }
495              
496             sub _capture_with_stdin {
497             my $self = shift;
498             # TODO: use a single capture method.
499             my $options = shift;
500             my @cmd = @_;
501              
502             my $stdin = delete $options->{stdin};
503              
504             $self->_log( debug => 'Executing command(IPC): %s', join(' ', @cmd) );
505              
506             my $start = time;
507              
508             my $res = IPC::Cmd::run_forked(
509             \@cmd,
510             {
511             ( $stdin ? (
512             child_stdin => $stdin,
513             ) : () ),
514             #timeout => $timeout,
515             terminate_on_parent_sudden_death => 1,
516             }
517             );
518              
519             $self->_log( debug => 'Execution took %.3f seconds', time - $start );
520              
521             my($stdout, $stderr, $fail);
522              
523             my $success = defined $res->{exit_code}
524             && $res->{exit_code} == 0
525             && ! $res->{timeout};
526              
527             $fail = $success ? 0 : $res->{exit_code};
528             $stderr = $res->{stderr};
529             $stdout = $res->{stdout};
530              
531             if ( $fail ) {
532             my $code = $fail >> 8;
533             $stderr ||= $res->{err_msg} || '[no error]';
534             my $msg = "External command (@cmd) failed with status=$code: $stderr";
535             if ( $options->{ignore_fail} ) {
536             if ( ! $options->{silent} ) {
537             warn "[Fatal error downgraded to a warning] $msg";
538             }
539             return $self->_split_on_newlines( $stdout || '' );
540             }
541             die $msg;
542             }
543              
544             if ( $stderr ) {
545             warn "Warning from external command: $stderr";
546             }
547              
548             return $self->_split_on_newlines( $stdout );
549             }
550              
551             sub _split_on_newlines {
552 0     0     my $self = shift;
553 0           my $rv = shift;
554              
555 0           $rv =~ s{ \A \s+ }{}xms;
556 0           $rv =~ s{ \s+ \z }{}xms;
557              
558 0           return split m{ \n+ }xms, $rv;
559             }
560              
561             sub _log {
562 0     0     my $self = shift;
563 0 0         return if ! $self->enable_log;
564 0           my($level, $tmpl, @param) = @_;
565 0           my $msg = sprintf "[%s] %s\n", uc $level, $tmpl;
566 0           printf STDERR $msg, @param;
567             }
568              
569             1;
570              
571             __END__
572              
573             =pod
574              
575             =encoding UTF-8
576              
577             =head1 NAME
578              
579             Hadoop::HDFS::Command
580              
581             =head1 VERSION
582              
583             version 0.005
584              
585             =head1 SYNOPSIS
586              
587             use Hadoop::HDFS::Command;
588             my $hdfs = Hadoop::HDFS::Command->new;
589             my @rv = $hdfs->$command( @command_args );
590              
591             =head1 DESCRIPTION
592              
593             This is a simple wrapper around the hdfs commandline to make them easier to
594             call from Perl and parse their output.
595              
596             The interface is partially done at the moment (see the implemented wrappers
597             down below).
598              
599             You can always use the WebHDFS to do similar operations instead of failling
600             back to the commandline. However there are several benefits of using the
601             cli; i) you'll end up with a single C<JVM> invocation, so the response
602             might be faster ii) Some functionality / endpoints might be buggy for WebHDFS
603             but might work with the cli (for example escaping certain values is broken
604             in some versions, but works with the cli).
605              
606             =head1 NAME
607              
608             Hadoop::HDFS::Command - Wrappers for various hadoop hdfs cli commands
609              
610             =head1 METHODS
611              
612             =head2 new
613              
614             The constructor. Available attributes are listed below.
615              
616             =head3 cmd_hdfs
617              
618             Default value is C</usr/bin/hdfs>. This option needs to be altered if you have
619             the C<`hdfs`> command in some other place.
620              
621             =head3 enable_log :Bool
622              
623             Can be used to enable the internal logging feature. Disabled by default.
624              
625             =head2 dfs
626              
627             One of the top level commands, including an interface to the sub-commands
628             listed below. The calling convention of the sub commands is as simple as:
629              
630             my @rv = $hdfs->dfs( \%options, $sub_command => @subcommand_args );
631             # options hash is optional
632             my @rv = $hdfs->dfs( $sub_command => @subcommand_args );
633              
634             Available options are listed below:
635              
636             =over 4
637              
638             =item ignore_fail :Bool
639              
640             Global.
641              
642             =item silent :Bool
643              
644             Global.
645              
646             =item want_epoch :Bool
647              
648             Only used for C<ls>. Converts timestamps to epoch.
649              
650             =item callback :CODE
651              
652             Only used for C<ls>. The callback always needs to return true to continue
653             processing, returning false from it will short-circuit the processor.
654              
655             =back
656              
657             =head3 du
658              
659             The C<@subcommand_args> can have these defined: C<-s>, C<-h>.
660              
661             my @rv = $hdfs->dfs( du => @subcommand_args => $hdfs_path );
662             my @rv = $hdfs->dfs( du => qw( -h -s ) => "/tmp" );
663             my @rv = $hdfs->dfs(
664             {
665             ignore_fail => 1,
666             silent => 1,
667             },
668             du => -s => @hdfs_paths,
669             );
670              
671             =head3 ls
672              
673             The C<@subcommand_args> can have these defined: C<-d>, C<-h>, C<R>.
674              
675             my @rv = $hdfs->dfs( ls => @subcommand_args => $hdfs_path );
676              
677             The callback can be used to prevent buffering and process the result set yourself.
678             The callback always needs to return true to continue processing. If you want to
679             skip some entries but continue processing then a true value needs to be returned.
680             A bare return (which is false) will short circuit the iterator and discard any
681             remaining records.
682              
683             my %options = (
684             callback => sub {
685             # This callback will receive a hash meta-data about the file.
686             my $file = shift;
687             if ( $file->{type} eq 'dir' ) {
688             # do something
689             }
690              
691             # skip this one, but continue processing
692             return 1 if $file->{type} ne 'file';
693              
694             # do something
695              
696             return if $something_really_bad_so_end_this_processor;
697              
698             # continue processing
699             return 1;
700             },
701             # The meta-data passed to the callback will have an "epoch"
702             # key set when this is true.
703             want_epoch => 1,
704             );
705             # execute the command recursively on the path
706             $hdfs->dfs( \%options, ls => -R => $hdfs_path );
707              
708             =head3 mv
709              
710             my @rv = $hdfs->dfs( mv => $hdfs_source_path, $hdfs_dest_path );
711              
712             =head3 put
713              
714             The C<@subcommand_args> can have these defined: C<-f>, C<-p>, C<-l>
715              
716             $hdfs->dfs( put => @subcommand_args, $local_path, $hdfs_path );
717             # notice the additional "-"
718             $hdfs->dfs( put => '-f', '-', $hdfs_path, $in_memory_data );
719              
720             =head3 rm
721              
722             The C<@subcommand_args> can have these defined: C<-f>, C<-r>, C<-skipTrash>
723              
724             $hdfs->dfs( rm => @subcommand_args, $hdfs_path );
725              
726             =head3 test
727              
728             The C<@subcommand_args> can have these defined: C<-d>, C<-e>, C<-f>, C<-s>, C<-z>
729              
730             $hdfs->dfs( test => @subcommand_args, $hdfs_path );
731              
732             =head3 mkdir
733              
734             The C<@subcommand_args> can have these defined: C<-p>
735              
736             $hdfs->dfs( mkdir => @subcommand_args, $path );
737              
738             =head3 chmod
739              
740             The C<@subcommand_args> can have these defined: C<-R>
741              
742             $hdfs->dfs( chmod => @subcommand_args, $mode, $path );
743              
744             =head3 chown
745              
746             The C<@subcommand_args> can have these defined: C<-R>
747              
748             $hdfs->dfs( chown => @subcommand_args, $OWNERCOLONGROUP, $path );
749              
750             =head3 get
751              
752             The C<@subcommand_args> can have these defined: C<-p>, C<-ignoreCrc>, C<-crc>
753              
754             $hdfs->dfs( get => @subcommand_args, $src, $localdst );
755              
756             =head1 SEE ALSO
757              
758             C<`hdfs dfs -help`>.
759              
760             =head1 AUTHOR
761              
762             Burak Gursoy <burak@cpan.org>
763              
764             =head1 COPYRIGHT AND LICENSE
765              
766             This software is copyright (c) 2016 by Burak Gursoy.
767              
768             This is free software; you can redistribute it and/or modify it under
769             the same terms as the Perl 5 programming language system itself.
770              
771             =cut