File Coverage

blib/lib/Net/Hadoop/WebHDFS.pm
Criterion Covered Total %
statement 32 187 17.1
branch 8 88 9.0
condition 12 49 24.4
subroutine 10 41 24.3
pod 17 35 48.5
total 79 400 19.7


line stmt bran cond sub pod time code
1             package Net::Hadoop::WebHDFS;
2              
3 1     1   46196 use strict;
  1         2  
  1         30  
4 1     1   4 use warnings;
  1         2  
  1         22  
5 1     1   4 use Carp;
  1         5  
  1         62  
6              
7 1     1   1001 use JSON::XS qw//;
  1         16228  
  1         35  
8              
9 1     1   1198 use Furl;
  1         202556  
  1         40  
10 1     1   1011 use URI;
  1         4339  
  1         2694  
11              
12             our $VERSION = "0.6";
13              
14             our %OPT_TABLE = ();
15              
16             sub new {
17 5     5 1 30458 my ($this, %opts) = @_;
18 5   50     152 my $self = +{
      50        
      50        
      50        
      50        
      50        
19             host => $opts{host} || 'localhost',
20             port => $opts{port} || 50070,
21             standby_host => $opts{standby_host},
22             standby_port => ($opts{standby_port} || $opts{port} || 50070),
23             httpfs_mode => $opts{httpfs_mode} || 0,
24             username => $opts{username},
25             doas => $opts{doas},
26             useragent => $opts{useragent} || 'Furl Net::Hadoop::WebHDFS (perl)',
27             timeout => $opts{timeout} || 10,
28             under_failover => 0,
29             };
30 5         43 $self->{furl} = Furl::HTTP->new(agent => $self->{useragent}, timeout => $self->{timeout}, max_redirects => 0);
31 5         410 return bless $self, $this;
32             }
33              
34             # curl -i -X PUT "http://:/webhdfs/v1/?op=CREATE
35             # [&overwrite=][&blocksize=][&replication=]
36             # [&permission=][&buffersize=]"
37             sub create {
38 0     0 1 0 my ($self, $path, $body, %options) = @_;
39 0 0       0 if ($self->{httpfs_mode}) {
40 0         0 %options = (%options, data => 'true');
41             }
42 0         0 my $err = $self->check_options('CREATE', %options);
43 0 0       0 croak $err if $err;
44              
45 0         0 my $res = $self->operate_requests('PUT', $path, 'CREATE', \%options, $body);
46 0         0 $res->{code} == 201;
47             }
48             $OPT_TABLE{CREATE} = ['overwrite', 'blocksize', 'replication', 'permission', 'buffersize', 'data'];
49              
50             # curl -i -X POST "http://:/webhdfs/v1/?op=APPEND[&buffersize=]"
51             sub append {
52 0     0 1 0 my ($self, $path, $body, %options) = @_;
53 0 0       0 if ($self->{httpfs_mode}) {
54 0         0 %options = (%options, data => 'true');
55             }
56 0         0 my $err = $self->check_options('APPEND', %options);
57 0 0       0 croak $err if $err;
58              
59 0         0 my $res = $self->operate_requests('POST', $path, 'APPEND', \%options, $body);
60 0         0 $res->{code} == 200;
61             }
62             $OPT_TABLE{APPEND} = ['buffersize', 'data'];
63              
64             # curl -i -L "http://:/webhdfs/v1/?op=OPEN
65             # [&offset=][&length=][&buffersize=]"
66             sub read {
67 0     0 1 0 my ($self, $path, %options) = @_;
68 0         0 my $err = $self->check_options('OPEN', %options);
69 0 0       0 croak $err if $err;
70              
71 0         0 my $res = $self->operate_requests('GET', $path, 'OPEN', \%options);
72 0         0 $res->{body};
73             }
74             $OPT_TABLE{OPEN} = ['offset', 'length', 'buffersize'];
75 0     0 0 0 sub open { (shift)->read(@_); }
76              
77             # curl -i -X PUT "http://:/?op=MKDIRS[&permission=]"
78             sub mkdir {
79 0     0 1 0 my ($self, $path, %options) = @_;
80 0         0 my $err = $self->check_options('MKDIRS', %options);
81 0 0       0 croak $err if $err;
82              
83 0         0 my $res = $self->operate_requests('PUT', $path, 'MKDIRS', \%options);
84 0         0 $self->check_success_json($res, 'boolean');
85             }
86             $OPT_TABLE{MKDIRS} = ['permission'];
87 0     0 0 0 sub mkdirs { (shift)->mkdir(@_); }
88              
89             # curl -i -X PUT ":/webhdfs/v1/?op=RENAME&destination="
90             sub rename {
91 0     0 1 0 my ($self, $path, $dest, %options) = @_;
92 0         0 my $err = $self->check_options('RENAME', %options);
93 0 0       0 croak $err if $err;
94              
95 0 0       0 unless ($dest =~ m!^/!) {
96 0         0 $dest = '/' . $dest;
97             }
98 0         0 my $res = $self->operate_requests('PUT', $path, 'RENAME', {%options, destination => $dest});
99 0         0 $self->check_success_json($res, 'boolean');
100             }
101              
102             # curl -i -X DELETE "http://:/webhdfs/v1/?op=DELETE
103             # [&recursive=]"
104             sub delete {
105 0     0 1 0 my ($self, $path, %options) = @_;
106 0         0 my $err = $self->check_options('DELETE', %options);
107 0 0       0 croak $err if $err;
108              
109 0         0 my $res = $self->operate_requests('DELETE', $path, 'DELETE', \%options);
110 0         0 $self->check_success_json($res, 'boolean');
111             }
112             $OPT_TABLE{DELETE} = ['recursive'];
113              
114             # curl -i "http://:/webhdfs/v1/?op=GETFILESTATUS"
115             sub stat {
116 0     0 1 0 my ($self, $path, %options) = @_;
117 0         0 my $err = $self->check_options('GETFILESTATUS', %options);
118 0 0       0 croak $err if $err;
119              
120 0         0 my $res = $self->operate_requests('GET', $path, 'GETFILESTATUS', \%options);
121 0         0 $self->check_success_json($res, 'FileStatus');
122             }
123 0     0 0 0 sub getfilestatus { (shift)->stat(@_); }
124              
125             # curl -i "http://:/webhdfs/v1/?op=LISTSTATUS"
126             sub list {
127 0     0 1 0 my ($self, $path, %options) = @_;
128 0         0 my $err = $self->check_options('LISTSTATUS', %options);
129 0 0       0 croak $err if $err;
130              
131 0         0 my $res = $self->operate_requests('GET', $path, 'LISTSTATUS', \%options);
132 0         0 $self->check_success_json($res, 'FileStatuses')->{FileStatus};
133             }
134 0     0 0 0 sub liststatus { (shift)->list(@_); }
135              
136             # curl -i "http://:/webhdfs/v1/?op=GETCONTENTSUMMARY"
137             sub content_summary {
138 0     0 1 0 my ($self, $path, %options) = @_;
139 0         0 my $err = $self->check_options('GETCONTENTSUMMARY', %options);
140 0 0       0 croak $err if $err;
141              
142 0         0 my $res = $self->operate_requests('GET', $path, 'GETCONTENTSUMMARY', \%options);
143 0         0 $self->check_success_json($res, 'ContentSummary');
144             }
145 0     0 0 0 sub getcontentsummary { (shift)->content_summary(@_); }
146              
147             # curl -i "http://:/webhdfs/v1/?op=GETFILECHECKSUM"
148             sub checksum {
149 0     0 1 0 my ($self, $path, %options) = @_;
150 0         0 my $err = $self->check_options('GETFILECHECKSUM', %options);
151 0 0       0 croak $err if $err;
152              
153 0         0 my $res = $self->operate_requests('GET', $path, 'GETFILECHECKSUM', \%options);
154 0         0 $self->check_success_json($res, 'FileChecksum');
155             }
156 0     0 0 0 sub getfilechecksum { (shift)->checksum(@_); }
157              
158             # curl -i "http://:/webhdfs/v1/?op=GETHOMEDIRECTORY"
159             sub homedir {
160 0     0 1 0 my ($self, %options) = @_;
161 0         0 my $err = $self->check_options('GETHOMEDIRECTORY', %options);
162 0 0       0 croak $err if $err;
163              
164 0         0 my $res = $self->operate_requests('GET', '/', 'GETHOMEDIRECTORY', \%options);
165 0         0 $self->check_success_json($res, 'Path');
166             }
167 0     0 0 0 sub gethomedirectory { (shift)->homedir(@_); }
168              
169             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETPERMISSION
170             # [&permission=]"
171             sub chmod {
172 0     0 1 0 my ($self, $path, $mode, %options) = @_;
173 0         0 my $err = $self->check_options('SETPERMISSION', %options);
174 0 0       0 croak $err if $err;
175              
176 0         0 my $res = $self->operate_requests('PUT', $path, 'SETPERMISSION', {%options, permission => $mode});
177 0         0 $res->{code} == 200;
178             }
179 0     0 0 0 sub setpermission { (shift)->chmod(@_); }
180              
181             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETOWNER
182             # [&owner=][&group=]"
183             sub chown {
184 0     0 1 0 my ($self, $path, %options) = @_;
185 0         0 my $err = $self->check_options('SETOWNER', %options);
186 0 0       0 croak $err if $err;
187              
188 0 0 0     0 unless (defined($options{owner}) or defined($options{group})) {
189 0         0 croak "'chown' needs at least one of owner or group";
190             }
191              
192 0         0 my $res = $self->operate_requests('PUT', $path, 'SETOWNER', \%options);
193 0         0 $res->{code} == 200;
194             }
195             $OPT_TABLE{SETOWNER} = ['owner', 'group'];
196 0     0 0 0 sub setowner { (shift)->chown(@_); }
197              
198             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETREPLICATION
199             # [&replication=]"
200             sub replication {
201 0     0 1 0 my ($self, $path, $replnum, %options) = @_;
202 0         0 my $err = $self->check_options('SETREPLICATION', %options);
203 0 0       0 croak $err if $err;
204              
205 0         0 my $res = $self->operate_requests('PUT', $path, 'SETREPLICATION', {%options, replication => $replnum});
206 0         0 $self->check_success_json($res, 'boolean');
207             }
208 0     0 0 0 sub setreplication { (shift)->replication(@_); }
209              
210             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETTIMES
211             # [&modificationtime=
212             # modificationtime: radix-10 long integer
213             # accesstime: radix-10 long integer
214             $OPT_TABLE{SETTIMES} = [ qw( modificationtime accesstime ) ];
215             sub touch {
216 0     0 1 0 my ($self, $path, %options) = @_;
217 0         0 my $err = $self->check_options('SETTIMES', %options);
218 0 0       0 croak $err if $err;
219              
220 0 0 0     0 unless (defined($options{modificationtime}) or defined($options{accesstime})) {
221 0         0 croak "'touch' needs at least one of modificationtime or accesstime";
222             }
223              
224 0         0 my $res = $self->operate_requests('PUT', $path, 'SETTIMES', \%options);
225 0         0 $res->{code} == 200;
226             }
227              
228             sub touchz {
229 0     0 1 0 my ($self, $path) = @_;
230 0         0 return $self->create( $path, '', overwrite => 'true' );
231             }
232              
233 0     0 0 0 sub settimes { (shift)->touch(@_); }
234              
235             # sub delegation_token {}
236             # sub renew_delegation_token {}
237             # sub cancel_delegation_token {}
238              
239             sub check_options {
240 0     0 0 0 my ($self, $op, %opts) = @_;
241 0         0 my @ex = ();
242 0   0     0 my $opts = $OPT_TABLE{$op} || [];
243 0         0 foreach my $k (keys %opts) {
244 0 0       0 push @ex, $k if scalar(grep {$k eq $_} @$opts) < 1;
  0         0  
245             }
246 0 0       0 return undef unless @ex;
247 0         0 'no such option: ' . join(' ', @ex);
248             }
249              
250             sub check_success_json {
251 8     8 0 663 my ($self, $res, $attr) = @_;
252 8 100 100     184 $res->{code} == 200 and $res->{content_type} =~ m!^application/json! and
      100        
253             (not defined($attr) or JSON::XS::decode_json($res->{body})->{$attr});
254             }
255              
256             sub api_path {
257 10     10 0 25 my ($self, $path) = @_;
258 10 100       71 return '/webhdfs/v1' . $path if $path =~ m!^/!;
259 1         5 '/webhdfs/v1/' . $path;
260             }
261              
262             sub build_path {
263 7     7 0 2719 my ($self, $path, $op, %params) = @_;
264 7 100       52 my %opts = (('op' => $op),
    100          
265             ($self->{username} ? ('user.name' => $self->{username}) : ()),
266             ($self->{doas} ? ('doas' => $self->{doas}) : ()),
267             %params);
268 7         31 my $u = URI->new('', 'http');
269 7         10839 $u->query_form(%opts);
270 7         776 $self->api_path($path) . $u->path_query; # path_query() #=> '?foo=1&bar=2'
271             }
272              
273             sub connect_to {
274 0     0 0   my $self = shift;
275 0 0         if ($self->{under_failover}) {
276 0           return ($self->{standby_host}, $self->{standby_port});
277             }
278 0           return ($self->{host}, $self->{port});
279             }
280              
281             our %REDIRECTED_OPERATIONS = (APPEND => 1, CREATE => 1, OPEN => 1, GETFILECHECKSUM => 1);
282             sub operate_requests {
283 0     0 0   my ($self, $method, $path, $op, $params, $payload) = @_;
284              
285 0           my ($host, $port) = $self->connect_to();
286              
287 0           my $headers = []; # or undef ?
288 0 0 0       if ($self->{httpfs_mode} or not $REDIRECTED_OPERATIONS{$op}) {
289             # empty files are ok
290 0 0 0       if ($self->{httpfs_mode} and defined($payload)) {
291 0           $headers = ['Content-Type' => 'application/octet-stream'];
292             }
293 0           return $self->request($host, $port, $method, $path, $op, $params, $payload, $headers);
294             }
295              
296             # pattern for not httpfs and redirected by namenode
297 0           my $res = $self->request($host, $port, $method, $path, $op, $params, undef);
298 0 0 0       unless ($res->{code} >= 300 and $res->{code} <= 399 and $res->{location}) {
      0        
299 0           my $code = $res->{code};
300 0           my $body = $res->{body};
301 0           croak "NameNode returns non-redirection (or without location header), code:$code, body:$body.";
302             }
303 0           my $uri = URI->new($res->{location});
304 0           $headers = ['Content-Type' => 'application/octet-stream'];
305 0           return $self->request($uri->host, $uri->port, $method, $uri->path_query, undef, {}, $payload, $headers);
306             }
307              
308             # IllegalArgumentException 400 Bad Request
309             # UnsupportedOperationException 400 Bad Request
310             # SecurityException 401 Unauthorized
311             # IOException 403 Forbidden
312             # FileNotFoundException 404 Not Found
313             # RumtimeException 500 Internal Server Error
314             sub request {
315 0     0 0   my ($self, $host, $port, $method, $path, $op, $params, $payload, $header) = @_;
316              
317 0 0         my $request_path = $op ? $self->build_path($path, $op, %$params) : $path;
318 0 0         my ($ver, $code, $msg, $headers, $body) = $self->{furl}->request(
319             method => $method,
320             host => $host,
321             port => $port,
322             path_query => $request_path,
323             headers => $header,
324             ($payload ? (content => $payload) : ()),
325             );
326              
327 0           my $res = { code => $code, body => $body };
328              
329 0           for (my $i = 0; $i < scalar(@$headers); $i += 2) {
330 0           my $header = $headers->[$i];
331 0           my $value = $headers->[$i + 1];
332              
333 0 0         if ($header =~ m!^location$!i) { $res->{location} = $value; }
  0 0          
334 0           elsif ($header =~ m!^content-type$!i) { $res->{content_type} = $value; }
335             }
336              
337 0 0 0       return $res if $code >= 200 and $code <= 299;
338 0 0 0       return $res if $code >= 300 and $code <= 399;
339              
340 0   0       my $errmsg = $res->{body} || 'Response body is empty...';
341 0           $errmsg =~ s/\n//g;
342              
343 0 0         if ($code == 400) { croak "ClientError: $errmsg"; }
  0 0          
    0          
    0          
    0          
344 0           elsif ($code == 401) { croak "SecurityError: $errmsg"; }
345             elsif ($code == 403) {
346 0 0         if ($errmsg =~ /org\.apache\.hadoop\.ipc\.StandbyException/) {
347 0 0 0       if ($self->{httpfs_mode} || not defined($self->{standby_host})) {
    0          
348             # failover is disabled
349             } elsif ($self->{retrying}) {
350             # more failover is prohibited
351 0           $self->{retrying} = 0;
352             } else {
353 0           $self->{under_failover} = not $self->{under_failover};
354 0           $self->{retrying} = 1;
355 0           my ($next_host, $next_port) = $self->connect_to();
356 0           my $val = $self->request($next_host, $next_port, $method, $path, $op, $params, $payload, $header);
357 0           $self->{retrying} = 0;
358 0           return $val;
359             }
360             }
361 0           croak "IOError: $errmsg";
362             }
363 0           elsif ($code == 404) { croak "FileNotFoundError: $errmsg"; }
364 0           elsif ($code == 500) { croak "ServerError: $errmsg"; }
365              
366 0           croak "RequestFailedError, code:$code, message:$errmsg";
367             }
368              
369             1;
370              
371             __END__