File Coverage

blib/lib/Net/Hadoop/WebHDFS.pm
Criterion Covered Total %
statement 41 355 11.5
branch 8 174 4.6
condition 12 82 14.6
subroutine 13 68 19.1
pod 25 43 58.1
total 99 722 13.7


line stmt bran cond sub pod time code
1             package Net::Hadoop::WebHDFS;
2              
3 1     1   22521 use strict;
  1         1  
  1         23  
4 1     1   3 use warnings;
  1         1  
  1         17  
5 1     1   3 use Carp;
  1         4  
  1         44  
6              
7 1     1   693 use JSON::XS qw//;
  1         4561  
  1         20  
8              
9 1     1   421 use Furl;
  1         18952  
  1         25  
10 1     1   6 use File::Spec;
  1         1  
  1         15  
11 1     1   504 use URI;
  1         3089  
  1         27  
12 1     1   452 use Try::Tiny;
  1         986  
  1         46  
13              
14 1     1   4 use constant GENERIC_FS_ACTION_WITH_NO_PATH => '';
  1         1  
  1         3429  
15              
16             our $VERSION = "0.8";
17              
18             our %OPT_TABLE = ();
19              
20             sub new {
21 5     5 1 13123 my ($this, %opts) = @_;
22             my $self = +{
23             host => $opts{host} || 'localhost',
24             port => $opts{port} || 50070,
25             standby_host => $opts{standby_host},
26             standby_port => ($opts{standby_port} || $opts{port} || 50070),
27             httpfs_mode => $opts{httpfs_mode} || 0,
28             username => $opts{username},
29             doas => $opts{doas},
30             useragent => $opts{useragent} || 'Furl Net::Hadoop::WebHDFS (perl)',
31             timeout => $opts{timeout} || 10,
32 5   50     109 suppress_errors => $opts{suppress_errors} || 0,
      50        
      50        
      50        
      50        
      50        
      50        
33             last_error => undef,
34             under_failover => 0,
35             };
36 5         22 $self->{furl} = Furl::HTTP->new(agent => $self->{useragent}, timeout => $self->{timeout}, max_redirects => 0);
37 5         134 return bless $self, $this;
38             }
39              
40             # curl -i -X PUT "http://:/webhdfs/v1/?op=CREATE
41             # [&overwrite=][&blocksize=][&replication=]
42             # [&permission=][&buffersize=]"
43             sub create {
44 0     0 1 0 my ($self, $path, $body, %options) = @_;
45 0 0       0 if ($self->{httpfs_mode}) {
46 0         0 %options = (%options, data => 'true');
47             }
48 0         0 my $err = $self->check_options('CREATE', %options);
49 0 0       0 croak $err if $err;
50              
51 0         0 my $res = $self->operate_requests('PUT', $path, 'CREATE', \%options, $body);
52 0         0 $res->{code} == 201;
53             }
54             $OPT_TABLE{CREATE} = ['overwrite', 'blocksize', 'replication', 'permission', 'buffersize', 'data'];
55              
56             # curl -i -X POST "http://:/webhdfs/v1/?op=APPEND
57             # [&buffersize=]"
58             sub append {
59 0     0 1 0 my ($self, $path, $body, %options) = @_;
60 0 0       0 if ($self->{httpfs_mode}) {
61 0         0 %options = (%options, data => 'true');
62             }
63 0         0 my $err = $self->check_options('APPEND', %options);
64 0 0       0 croak $err if $err;
65              
66 0         0 my $res = $self->operate_requests('POST', $path, 'APPEND', \%options, $body);
67 0         0 $res->{code} == 200;
68             }
69             $OPT_TABLE{APPEND} = ['buffersize', 'data'];
70              
71             # curl -i -L "http://:/webhdfs/v1/?op=OPEN
72             # [&offset=][&length=][&buffersize=]"
73             sub read {
74 0     0 1 0 my ($self, $path, %options) = @_;
75 0         0 my $err = $self->check_options('OPEN', %options);
76 0 0       0 croak $err if $err;
77              
78 0         0 my $res = $self->operate_requests('GET', $path, 'OPEN', \%options);
79 0         0 $res->{body};
80             }
81             $OPT_TABLE{OPEN} = ['offset', 'length', 'buffersize'];
82 0     0 0 0 sub open { (shift)->read(@_); }
83              
84             # curl -i -X PUT "http://:/?op=MKDIRS
85             # [&permission=]"
86             sub mkdir {
87 0     0 1 0 my ($self, $path, %options) = @_;
88 0         0 my $err = $self->check_options('MKDIRS', %options);
89 0 0       0 croak $err if $err;
90              
91 0         0 my $res = $self->operate_requests('PUT', $path, 'MKDIRS', \%options);
92 0         0 $self->check_success_json($res, 'boolean');
93             }
94             $OPT_TABLE{MKDIRS} = ['permission'];
95 0     0 0 0 sub mkdirs { (shift)->mkdir(@_); }
96              
97             # curl -i -X PUT "http://:/webhdfs/v1/?op=RENAME
98             # &destination="
99             sub rename {
100 0     0 1 0 my ($self, $path, $dest, %options) = @_;
101 0         0 my $err = $self->check_options('RENAME', %options);
102 0 0       0 croak $err if $err;
103              
104 0 0       0 unless ($dest =~ m!^/!) {
105 0         0 $dest = '/' . $dest;
106             }
107 0         0 my $res = $self->operate_requests('PUT', $path, 'RENAME', {%options, destination => $dest});
108 0         0 $self->check_success_json($res, 'boolean');
109             }
110              
111             # curl -i -X DELETE "http://:/webhdfs/v1/?op=DELETE
112             # [&recursive=]"
113             sub delete {
114 0     0 1 0 my ($self, $path, %options) = @_;
115 0         0 my $err = $self->check_options('DELETE', %options);
116 0 0       0 croak $err if $err;
117              
118 0         0 my $res = $self->operate_requests('DELETE', $path, 'DELETE', \%options);
119 0         0 $self->check_success_json($res, 'boolean');
120             }
121             $OPT_TABLE{DELETE} = ['recursive'];
122              
123             # curl -i "http://:/webhdfs/v1/?op=GETFILESTATUS"
124             sub stat {
125 0     0 1 0 my ($self, $path, %options) = @_;
126 0         0 my $err = $self->check_options('GETFILESTATUS', %options);
127 0 0       0 croak $err if $err;
128              
129 0         0 my $res = $self->operate_requests('GET', $path, 'GETFILESTATUS', \%options);
130 0         0 $self->check_success_json($res, 'FileStatus');
131             }
132 0     0 0 0 sub getfilestatus { (shift)->stat(@_); }
133              
134             # curl -i "http://:/webhdfs/v1/?op=LISTSTATUS"
135             sub list {
136 0     0 1 0 my ($self, $path, %options) = @_;
137 0         0 my $err = $self->check_options('LISTSTATUS', %options);
138 0 0       0 croak $err if $err;
139              
140 0         0 my $res = $self->operate_requests('GET', $path, 'LISTSTATUS', \%options);
141 0         0 $self->check_success_json($res, 'FileStatuses')->{FileStatus};
142             }
143 0     0 0 0 sub liststatus { (shift)->list(@_); }
144              
145             # curl -i "http://:/webhdfs/v1/?op=GETCONTENTSUMMARY"
146             sub content_summary {
147 0     0 1 0 my ($self, $path, %options) = @_;
148 0         0 my $err = $self->check_options('GETCONTENTSUMMARY', %options);
149 0 0       0 croak $err if $err;
150              
151 0         0 my $res = $self->operate_requests('GET', $path, 'GETCONTENTSUMMARY', \%options);
152 0         0 $self->check_success_json($res, 'ContentSummary');
153             }
154 0     0 0 0 sub getcontentsummary { (shift)->content_summary(@_); }
155              
156             # curl -i "http://:/webhdfs/v1/?op=GETFILECHECKSUM"
157             sub checksum {
158 0     0 1 0 my ($self, $path, %options) = @_;
159 0         0 my $err = $self->check_options('GETFILECHECKSUM', %options);
160 0 0       0 croak $err if $err;
161              
162 0         0 my $res = $self->operate_requests('GET', $path, 'GETFILECHECKSUM', \%options);
163 0         0 $self->check_success_json($res, 'FileChecksum');
164             }
165 0     0 0 0 sub getfilechecksum { (shift)->checksum(@_); }
166              
167             # curl -i "http://:/webhdfs/v1/?op=GETHOMEDIRECTORY"
168             sub homedir {
169 0     0 1 0 my ($self, %options) = @_;
170 0         0 my $err = $self->check_options('GETHOMEDIRECTORY', %options);
171 0 0       0 croak $err if $err;
172              
173 0         0 my $res = $self->operate_requests('GET', '/', 'GETHOMEDIRECTORY', \%options);
174 0         0 $self->check_success_json($res, 'Path');
175             }
176 0     0 0 0 sub gethomedirectory { (shift)->homedir(@_); }
177              
178             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETPERMISSION
179             # [&permission=]"
180             sub chmod {
181 0     0 1 0 my ($self, $path, $mode, %options) = @_;
182 0         0 my $err = $self->check_options('SETPERMISSION', %options);
183 0 0       0 croak $err if $err;
184              
185 0         0 my $res = $self->operate_requests('PUT', $path, 'SETPERMISSION', {%options, permission => $mode});
186 0         0 $res->{code} == 200;
187             }
188 0     0 0 0 sub setpermission { (shift)->chmod(@_); }
189              
190             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETOWNER
191             # [&owner=][&group=]"
192             sub chown {
193 0     0 1 0 my ($self, $path, %options) = @_;
194 0         0 my $err = $self->check_options('SETOWNER', %options);
195 0 0       0 croak $err if $err;
196              
197 0 0 0     0 unless (defined($options{owner}) or defined($options{group})) {
198 0         0 croak "'chown' needs at least one of owner or group";
199             }
200              
201 0         0 my $res = $self->operate_requests('PUT', $path, 'SETOWNER', \%options);
202 0         0 $res->{code} == 200;
203             }
204             $OPT_TABLE{SETOWNER} = ['owner', 'group'];
205 0     0 0 0 sub setowner { (shift)->chown(@_); }
206              
207             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETREPLICATION
208             # [&replication=]"
209             sub replication {
210 0     0 1 0 my ($self, $path, $replnum, %options) = @_;
211 0         0 my $err = $self->check_options('SETREPLICATION', %options);
212 0 0       0 croak $err if $err;
213              
214 0         0 my $res = $self->operate_requests('PUT', $path, 'SETREPLICATION', {%options, replication => $replnum});
215 0         0 $self->check_success_json($res, 'boolean');
216             }
217 0     0 0 0 sub setreplication { (shift)->replication(@_); }
218              
219             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETTIMES
220             # [&modificationtime=
221             # modificationtime: radix-10 long integer
222             # accesstime: radix-10 long integer
223             $OPT_TABLE{SETTIMES} = [ qw( modificationtime accesstime ) ];
224             sub touch {
225 0     0 1 0 my ($self, $path, %options) = @_;
226 0         0 my $err = $self->check_options('SETTIMES', %options);
227 0 0       0 croak $err if $err;
228              
229 0 0 0     0 unless (defined($options{modificationtime}) or defined($options{accesstime})) {
230 0         0 croak "'touch' needs at least one of modificationtime or accesstime";
231             }
232              
233 0         0 my $res = $self->operate_requests('PUT', $path, 'SETTIMES', \%options);
234 0         0 $res->{code} == 200;
235             }
236              
237             #---------------------------- EXTENDED ATTRIBUTES START -----------------------#
238              
239             sub xattr {
240 0     0 1 0 my($self, $path, $action, @args) = @_;
241 0 0       0 croak "No action defined for xattr" if ! $action;
242 0         0 my $target = sprintf '_%s_xattr', $action;
243 0         0 my $target2 = sprintf '_%s_xattrs', $action;
244 0   0     0 my $method = $self->can( $target )
245             || $self->can( $target2 )
246             || croak "invalid action `$action`";
247 0         0 $self->$method( $path, @args );
248             }
249              
250             # curl -i "http://:/webhdfs/v1/?op=GETXATTRS
251             # &xattr.name=&encoding="
252             #
253             # curl -i "http://:/webhdfs/v1/?op=GETXATTRS
254             # &xattr.name=&xattr.name=&encoding="
255             $OPT_TABLE{GETXATTRS} = [qw( names encoding flatten )];
256             sub _get_xattrs {
257 0     0   0 my($self, $path, %options) = @_;
258 0         0 my $err = $self->check_options('GETXATTRS', %options);
259 0 0       0 croak $err if $err;
260              
261 0         0 my $flatten = delete $options{flatten};
262              
263             # limit to a subset? will return all of the attributes otherwise
264 0 0       0 if ( my $name = delete $options{names} ) {
265 0 0       0 croak "getxattrs: name needs to be an arrayref" if ref $name ne 'ARRAY';
266 0         0 $options{'xattr.name'} = $name;
267             }
268              
269 0         0 my $res = $self->operate_requests('GET', $path, 'GETXATTRS', \%options);
270 0 0       0 if ( my $rv = $self->check_success_json($res, 'XAttrs') ) {
271 0 0       0 croak "Unexpected return value from listxattrs: $rv"
272             if ref $rv ne 'ARRAY';
273 0 0       0 return $rv if ! $flatten;
274 0         0 map { @{ $_ }{qw/ name value /} } @{ $rv };
  0         0  
  0         0  
  0         0  
275             }
276             }
277              
278             # curl -i "http://:/webhdfs/v1/?op=LISTXATTRS"
279             sub _list_xattrs {
280 0     0   0 my($self, $path) = @_;
281              
282 0         0 my $res = $self->operate_requests('GET', $path, 'LISTXATTRS');
283 0 0       0 if ( my $rv = $self->check_success_json($res, 'XAttrNames') ) {
284 0         0 my $attr = JSON::XS::decode_json $rv;
285 0 0       0 croak "Unexpected return value from listxattrs: $attr"
286             if ref $attr ne 'ARRAY';
287 0         0 return @{ $attr };
  0         0  
288             }
289             }
290              
291             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETXATTR
292             # &xattr.name=&xattr.value=&flag="
293             # https://blog.cloudera.com/blog/2014/06/why-extended-attributes-are-coming-to-hdfs/
294             # flag: [CREATE,REPLACE]
295             $OPT_TABLE{SETXATTR} = [qw( name value flag )];
296             sub _set_xattr {
297 0     0   0 my($self, $path, %options) = @_;
298 0         0 my $err = $self->check_options('SETXATTR', %options);
299 0 0       0 croak $err if $err;
300              
301 0 0       0 croak "value of xattr not set" if ! exists $options{value};
302              
303 0   0     0 $options{ 'xattr.name' } = delete $options{name} || croak "name of xattr not set";
304 0         0 $options{ 'xattr.value' } = delete $options{value};
305              
306 0 0       0 croak 'flag was not specified.' if ! $options{flag};
307              
308 0         0 my $res = $self->operate_requests( PUT => $path, 'SETXATTR', \%options);
309 0         0 $res->{code} == 200;
310             }
311              
312             sub _create_xattr {
313 0     0   0 my($self, $path, $name, $value) = @_;
314 0         0 $self->_set_xattr(
315             $path,
316             name => $name,
317             value => $value,
318             flag => 'CREATE',
319             );
320             }
321              
322             sub _replace_xattr {
323 0     0   0 my($self, $path, $name, $value) = @_;
324 0         0 $self->_set_xattr(
325             $path,
326             name => $name,
327             value => $value,
328             flag => 'REPLACE',
329             );
330             }
331              
332             # curl -i -X PUT "http://:/webhdfs/v1/?op=REMOVEXATTR
333             # &xattr.name="
334             sub _remove_xattr {
335 0     0   0 my($self, $path, $name) = @_;
336              
337 0         0 my %options;
338 0   0     0 $options{'xattr.name'} = $name || croak "xattr name was not specified";
339              
340 0         0 my $res = $self->operate_requests( PUT => $path, 'REMOVEXATTR', \%options);
341 0         0 $res->{code} == 200;
342             }
343              
344             #---------------------------- EXTENDED ATTRIBUTES END -------------------------#
345              
346             # curl -i "http://:/webhdfs/v1/?op=CHECKACCESS
347             # &fsaction=
348             # this seems to be broken in some versions. You may get a "No enum constant ..."
349             # error if this is the case.
350             # Also see https://issues.apache.org/jira/browse/HDFS-9695
351             #
352             sub checkaccess {
353 0     0 1 0 my($self, $path, $fsaction, %options) = @_;
354 0 0       0 croak "checkaccess: fsaction parameter was not specified" if ! $fsaction;
355 0         0 my $err = $self->check_options('CHECKACCESS', %options);
356 0 0       0 croak $err if $err;
357              
358 0         0 $options{fsaction} = $fsaction;
359              
360 0         0 my $res = $self->operate_requests('GET', $path, 'CHECKACCESS', \%options);
361 0         0 $res->{code} == 200;
362             }
363              
364             # curl -i -X POST "http://:/webhdfs/v1/?op=CONCAT
365             # &sources="
366             sub concat {
367 0     0 1 0 my($self, $path, @sources) = @_;
368              
369 0 0       0 croak "At least one source path needs to be specified" if ! @sources;
370              
371 0         0 my $paths = join q{,}, @sources;
372              
373 0         0 my $res = $self->operate_requests(
374             POST => $path,
375             'CONCAT',
376             { sources => $paths },
377             );
378 0         0 $res->{code} == 200;
379             }
380              
381             # curl -i -X POST "http://:/webhdfs/v1/?op=TRUNCATE
382             # &newlength="
383             # Available after Hadoop v2.7
384             # https://issues.apache.org/jira/browse/HDFS-7655
385             #
386             sub truncate {
387 0     0 1 0 my($self, $path, $newlength) = @_;
388 0 0       0 $newlength = 0 if ! defined $newlength;
389              
390 0         0 my $res = $self->operate_requests(
391             POST => $path,
392             'TRUNCATE',
393             { newlength => $newlength },
394             );
395              
396 0 0       0 if ( my $rv = $self->check_success_json($res, 'boolean') ) {
397 0         0 $rv eq 'true';
398             }
399             }
400              
401             # curl -i -X PUT "http://:/webhdfs/v1/?op=CREATESYMLINK
402             # &destination=[&createParent=]"
403             # currently broken/disabled
404             # https://issues.apache.org/jira/browse/HADOOP-10019
405              
406             #$OPT_TABLE{CREATESYMLINK} = [qw( destination createParent )];
407             #sub createsymlink {
408             # # Not available yet
409             # # https://issues.apache.org/jira/browse/HADOOP-10019
410             # my($self, $path, $destination, $createParent) = @_;
411             #
412             # croak "createsymlink: destination not specified" if ! $destination;
413             #
414             # my %options = (
415             # destination => $destination,
416             # ($createParent ? (
417             # createParent => $createParent ? 'true' : 'false',
418             # ) : ())
419             # );
420             #
421             # my $res = $self->operate_requests( PUT => $path, 'CREATESYMLINK', \%options);
422             # $res->{code} == 200;
423             #}
424              
425             #---------------------------- DELEGATION TOKEN START --------------------------#
426             # Also see
427             # http://hadoop.apache.org/docs/r2.6.0/hadoop-hdfs-httpfs/httpfs-default.html
428              
429             # GETDELEGATIONTOKENS: Obsolete and removed after HDFS-10200, HDFS-3667
430             #
431              
432             sub delegation_token {
433 0     0 1 0 my($self, $action, @args) = @_;
434 0 0       0 croak "No action defined for delegation_token" if ! $action;
435 0         0 my $target = sprintf '_%s_delegation_token', $action;
436 0 0       0 croak "invalid action $action" if ! $self->can( $target );
437 0         0 $self->$target( @args );
438             }
439              
440             # curl -i "http://:/webhdfs/v1/?op=GETDELEGATIONTOKEN
441             # &renewer=&service=&kind="
442             # kind: The kind of the delegation token requested
443             # (Server sets the default kind for the service)
444             # A string that represents token kind e.g "HDFS_DELEGATION_TOKEN" or "WEBHDFS delegation"
445             # service: The name of the service where the token is supposed to be used, e.g. ip:port of the namenode
446             #
447             $OPT_TABLE{GETDELEGATIONTOKEN} = [qw( renewer service kind )];
448             sub _get_delegation_token {
449 0     0   0 my($self, $path, %options) = @_;
450 0         0 my $err = $self->check_options('GETDELEGATIONTOKEN', %options);
451 0 0       0 croak $err if $err;
452              
453 0 0 0     0 $options{renewer} ||= $self->{username} if $self->{username};
454              
455 0         0 my $res = $self->operate_requests( GET => $path, 'GETDELEGATIONTOKEN', \%options);
456              
457 0 0       0 if ( my $rv = $self->check_success_json($res, 'Token') ) {
458 0         0 $rv->{urlString};
459             }
460             }
461              
462             # curl -i -X PUT "http://:/webhdfs/v1/?op=RENEWDELEGATIONTOKEN
463             # &token="
464             sub _renew_delegation_token {
465 0     0   0 my($self, $token) = @_;
466              
467 0 0       0 croak "No token was specified" if ! $token;
468              
469 0         0 my $res = $self->operate_requests(
470             PUT => GENERIC_FS_ACTION_WITH_NO_PATH,
471             'RENEWDELEGATIONTOKEN',
472             { token => $token },
473             );
474 0 0       0 if ( my $rv = $self->check_success_json($res, 'long') ) {
475 0         0 $rv; # new expiration time in miliseconds
476             }
477             }
478              
479             # curl -i -X PUT "http://:/webhdfs/v1/?op=CANCELDELEGATIONTOKEN
480             # &token="
481             sub _cancel_delegation_token {
482 0     0   0 my($self, $token) = @_;
483              
484 0 0       0 croak "No token was specified" if ! $token;
485              
486 0         0 my $res = $self->operate_requests(
487             PUT => GENERIC_FS_ACTION_WITH_NO_PATH,
488             'CANCELDELEGATIONTOKEN',
489             { token => $token },
490             );
491 0         0 $res->{code} == 200;
492             }
493              
494             #---------------------------- DELEGATION TOKEN END ----------------------------#
495              
496             #---------------------------- SNAPSHOT START ----------------------------------#
497              
498             # Needs testing, seems to be buggy and can be destructive in earlier versions
499             # i.e.: https://issues.apache.org/jira/browse/HDFS-9406
500             #
501             # Snaphotting is not enabled by default and this needs to be executed as a super user:
502             # hdfs dfsadmin -allowSnapshot $path
503             #
504             sub snapshot {
505 0     0 1 0 my($self, $path, $action, @args) = @_;
506 0 0       0 croak "No action defined for delegation_token" if ! $action;
507 0         0 my $target = sprintf '_%s_snapshot', $action;
508 0 0       0 croak sprintf "%s: invalid action $action", (caller 0)[3] if ! $self->can( $target );
509 0         0 $self->$target( $path => @args );
510             }
511              
512             # curl -i -X PUT "http://:/webhdfs/v1/?op=CREATESNAPSHOT
513             # [&snapshotname=]"
514             sub _create_snapshot {
515 0     0   0 my($self, $path, $snapshotname) = @_;
516              
517 0         0 my %options;
518 0 0       0 $options{snapshotname} = $snapshotname if $snapshotname;
519 0         0 my $res = $self->operate_requests('PUT', $path, 'CREATESNAPSHOT', \%options);
520 0 0       0 if ( my $rv = $self->check_success_json($res, 'Path') ) {
521 0         0 $rv;
522             }
523             }
524              
525             # curl -i -X PUT "http://:/webhdfs/v1/?op=RENAMESNAPSHOT
526             # &oldsnapshotname=&snapshotname="
527             sub _rename_snapshot {
528 0     0   0 my($self, $path, $oldsnapshotname, $snapshotname) = @_;
529              
530 0         0 my %options = (
531             oldsnapshotname => $oldsnapshotname,
532             snapshotname => $snapshotname,
533             );
534              
535 0         0 my $res = $self->operate_requests('PUT', $path, 'RENAMESNAPSHOT', \%options);
536 0         0 $res->{code} == 200;
537             }
538              
539             # curl -i -X DELETE "http://:/webhdfs/v1/?op=DELETESNAPSHOT
540             # &snapshotname="
541             sub _delete_snapshot {
542 0     0   0 my($self, $path, $snapshotname) = @_;
543 0 0       0 croak "No snapshotname specified" if ! $snapshotname;
544              
545 0         0 my %options = (
546             snapshotname => $snapshotname,
547             );
548 0         0 my $res = $self->operate_requests('DELETE', $path, 'DELETESNAPSHOT', \%options);
549 0         0 $res->{code} == 200;
550             }
551              
552             #---------------------------- SNAPSHOT END ------------------------------------#
553              
554             sub touchz {
555 0     0 1 0 my ($self, $path) = @_;
556 0         0 return $self->create( $path, '', overwrite => 'true' );
557             }
558              
559 0     0 0 0 sub settimes { (shift)->touch(@_); }
560              
561             # sub delegation_token {}
562             # sub renew_delegation_token {}
563             # sub cancel_delegation_token {}
564              
565             sub check_options {
566 0     0 0 0 my ($self, $op, %opts) = @_;
567 0         0 my @ex = ();
568 0   0     0 my $opts = $OPT_TABLE{$op} || [];
569 0         0 foreach my $k (keys %opts) {
570 0 0       0 push @ex, $k if scalar(grep {$k eq $_} @$opts) < 1;
  0         0  
571             }
572 0 0       0 return undef unless @ex;
573 0         0 'no such option: ' . join(' ', @ex);
574             }
575              
576             sub check_success_json {
577 8     8 0 19 my ($self, $res, $attr) = @_;
578             $res->{code} == 200 and $res->{content_type} =~ m!^application/json! and
579 8 100 66     80 (not defined($attr) or JSON::XS::decode_json($res->{body})->{$attr});
      100        
580             }
581              
582             sub api_path {
583 10     10 0 17 my ($self, $path) = @_;
584 10 100       55 return '/webhdfs/v1' . $path if $path =~ m!^/!;
585 1         5 '/webhdfs/v1/' . $path;
586             }
587              
588             sub build_path {
589 7     7 0 1807 my ($self, $path, $op, %params) = @_;
590             my %opts = (('op' => $op),
591             ($self->{username} ? ('user.name' => $self->{username}) : ()),
592 7 100       33 ($self->{doas} ? ('doas' => $self->{doas}) : ()),
    100          
593             %params);
594 7         25 my $u = URI->new('', 'http');
595 7         5985 $u->query_form(%opts);
596 7         408 $self->api_path($path) . $u->path_query; # path_query() #=> '?foo=1&bar=2'
597             }
598              
599             sub connect_to {
600 0     0 0   my $self = shift;
601 0 0         if ($self->{under_failover}) {
602 0           return ($self->{standby_host}, $self->{standby_port});
603             }
604 0           return ($self->{host}, $self->{port});
605             }
606              
607             our %REDIRECTED_OPERATIONS = (APPEND => 1, CREATE => 1, OPEN => 1, GETFILECHECKSUM => 1);
608             sub operate_requests {
609 0     0 0   my ($self, $method, $path, $op, $params, $payload) = @_;
610              
611 0           my ($host, $port) = $self->connect_to();
612              
613 0           my $headers = []; # or undef ?
614 0 0 0       if ($self->{httpfs_mode} or not $REDIRECTED_OPERATIONS{$op}) {
615             # empty files are ok
616 0 0 0       if ($self->{httpfs_mode} and defined($payload)) {
617 0           $headers = ['Content-Type' => 'application/octet-stream'];
618             }
619 0           return $self->request($host, $port, $method, $path, $op, $params, $payload, $headers);
620             }
621              
622             # pattern for not httpfs and redirected by namenode
623 0           my $res = $self->request($host, $port, $method, $path, $op, $params, undef);
624 0 0 0       unless ($res->{code} >= 300 and $res->{code} <= 399 and $res->{location}) {
      0        
625 0           my $code = $res->{code};
626 0           my $body = $res->{body};
627 0           croak "NameNode returns non-redirection (or without location header), code:$code, body:$body.";
628             }
629 0           my $uri = URI->new($res->{location});
630 0           $headers = ['Content-Type' => 'application/octet-stream'];
631 0           return $self->request($uri->host, $uri->port, $method, $uri->path_query, undef, {}, $payload, $headers);
632             }
633              
634             sub request {
635 0     0 0   my $self = shift;
636 0 0         return $self->_request(@_) unless $self->{suppress_errors};
637              
638             try {
639 0     0     $self->_request(@_);
640             } catch {
641 0     0     $self->{last_error} = $_;
642 0           0;
643 0           };
644             }
645              
646             # IllegalArgumentException 400 Bad Request
647             # UnsupportedOperationException 400 Bad Request
648             # SecurityException 401 Unauthorized
649             # IOException 403 Forbidden
650             # FileNotFoundException 404 Not Found
651             # RumtimeException 500 Internal Server Error
652             sub _request {
653 0     0     my ($self, $host, $port, $method, $path, $op, $params, $payload, $header) = @_;
654              
655 0 0         my $request_path = $op ? $self->build_path($path, $op, %$params) : $path;
656             my ($ver, $code, $msg, $headers, $body) = $self->{furl}->request(
657 0 0         method => $method,
658             host => $host,
659             port => $port,
660             path_query => $request_path,
661             headers => $header,
662             ($payload ? (content => $payload) : ()),
663             );
664              
665 0           my $res = { code => $code, body => $body };
666              
667 0           for (my $i = 0; $i < scalar(@$headers); $i += 2) {
668 0           my $header = $headers->[$i];
669 0           my $value = $headers->[$i + 1];
670              
671 0 0         if ($header =~ m!^location$!i) { $res->{location} = $value; }
  0 0          
672 0           elsif ($header =~ m!^content-type$!i) { $res->{content_type} = $value; }
673             }
674              
675 0 0 0       return $res if $code >= 200 and $code <= 299;
676 0 0 0       return $res if $code >= 300 and $code <= 399;
677              
678 0   0       my $errmsg = $res->{body} || 'Response body is empty...';
679 0           $errmsg =~ s/\n//g;
680              
681 0 0         if ($code == 400) { croak "ClientError: $errmsg"; }
  0 0          
    0          
    0          
    0          
682 0           elsif ($code == 401) { croak "SecurityError: $errmsg"; }
683             elsif ($code == 403) {
684 0 0         if ($errmsg =~ /org\.apache\.hadoop\.ipc\.StandbyException/) {
685 0 0 0       if ($self->{httpfs_mode} || not defined($self->{standby_host})) {
    0          
686             # failover is disabled
687             } elsif ($self->{retrying}) {
688             # more failover is prohibited
689 0           $self->{retrying} = 0;
690             } else {
691 0           $self->{under_failover} = not $self->{under_failover};
692 0           $self->{retrying} = 1;
693 0           my ($next_host, $next_port) = $self->connect_to();
694 0           my $val = $self->request($next_host, $next_port, $method, $path, $op, $params, $payload, $header);
695 0           $self->{retrying} = 0;
696 0           return $val;
697             }
698             }
699 0           croak "IOError: $errmsg";
700             }
701 0           elsif ($code == 404) { croak "FileNotFoundError: $errmsg"; }
702 0           elsif ($code == 500) { croak "ServerError: $errmsg"; }
703              
704 0           croak "RequestFailedError, code:$code, message:$errmsg";
705             }
706              
707             sub exists {
708 0     0 1   my $self = shift;
709 0   0       my $path = shift || croak "No HDFS path was specified";
710 0           my $stat;
711             eval {
712 0           $stat = $self->stat( $path );
713 0           1;
714 0 0         } or do {
715 0   0       my $eval_error = $@ || 'Zombie error';
716 0 0         return if $eval_error =~ m<
717             \QFileNotFoundError: {"RemoteException":{"message":"File does not exist:\E
718             >xms;
719             # just re-throw
720 0           croak $eval_error;
721             };
722 0           return $stat;
723             }
724              
725             sub find {
726 0     0 1   my $self = shift;
727 0   0       my $file_path = shift || croak "No file path specified";
728 0           my $cb = shift;
729 0 0 0       my $opt = @_ && ref $_[-1] eq 'HASH' ? pop @_ : {};
730              
731 0 0         if ( ref $cb ne 'CODE' ) {
732 0           die "Call back needs to be a CODE ref";
733             }
734              
735 0           my $suppress = $self->{suppress_errors};
736             # can be used to quickly skip the java junk like, file names starting with
737             # underscores, etc.
738 0 0         my $re_ignore = $opt->{re_ignore} ? qr/$opt->{re_ignore}/ : undef;
739              
740             #
741             # No such thing like symlinks (yet) in HDFS, in case you're wondering:
742             # https://issues.apache.org/jira/browse/HADOOP-10019
743             # although check that link yourself
744             #
745 0           my $looper;
746             $looper = sub {
747 0     0     my $thing = shift;
748 0 0         if ( ! $self->exists( $thing ) ) {
749             # should happen at the start, so this will short-circuit the recursion
750 0           warn "The HDFS directory specified ($thing) does not exist! Please guard your HDFS paths with exists()";
751 0           return;
752             }
753 0           my $list = $self->list( $thing );
754 0           foreach my $e ( @{ $list } ) {
  0            
755 0           my $path = $e->{pathSuffix};
756 0           my $type = $e->{type};
757              
758 0 0 0       next if $re_ignore && $path && $path =~ $re_ignore;
      0        
759              
760 0 0         if ( $type eq 'DIRECTORY' ) {
    0          
761 0           $cb->( $thing, $e );
762             eval {
763 0           $looper->( File::Spec->catdir( $thing, $path ) );
764 0           1;
765 0 0         } or do {
766 0   0       my $eval_error = $@ || 'Zombie error';
767 0 0         if ( $suppress ) {
768 0           warn "[ERROR DOWNGRADED] Failed to check $thing/$path: $eval_error";
769 0           next;
770             }
771 0           croak $eval_error;
772             }
773             }
774             elsif ( $type eq 'FILE' ) {
775 0           $cb->( $thing, $e );
776             }
777             else {
778 0           my $msg = "I don't know what to do with type=$type!";
779 0 0         if ( $suppress ) {
780 0           warn "[ERROR DOWNGRADED] $msg";
781 0           next;
782             }
783 0           croak $msg;
784             }
785             }
786 0           return;
787 0           };
788              
789 0           $looper->( $file_path );
790              
791 0           return;
792             }
793              
794             1;
795              
796             __END__