File Coverage

blib/lib/HBase/JSONRest.pm
Criterion Covered Total %
statement 89 270 32.9
branch 24 98 24.4
condition 11 57 19.3
subroutine 12 24 50.0
pod 7 7 100.0
total 143 456 31.3


line stmt bran cond sub pod time code
1             package HBase::JSONRest;
2              
3 2     2   25815 use strict;
  2         2  
  2         45  
4 2     2   5 use warnings;
  2         2  
  2         38  
5              
6 2     2   6 use Carp;
  2         5  
  2         101  
7 2     2   1167 use HTTP::Tiny;
  2         67100  
  2         62  
8 2     2   768 use URI::Escape;
  2         1795  
  2         92  
9 2     2   1058 use MIME::Base64;
  2         935  
  2         98  
10 2     2   1159 use JSON::XS qw(decode_json encode_json);
  2         10558  
  2         112  
11 2     2   917 use Time::HiRes qw(gettimeofday time);
  2         1813  
  2         8  
12 2     2   1327 use Data::Dumper;
  2         8129  
  2         106  
13              
14 2     2   1053 use IO::Uncompress::Gunzip qw(gunzip $GunzipError) ;
  2         52861  
  2         3962  
15              
16             our $VERSION = "0.046";
17              
18             my %INFO_ROUTES = (
19             version => '/version',
20             list => '/',
21             );
22              
23             ################
24             # Class Methods
25             #
26             sub new {
27 0     0 1 0 my $class = shift;
28 0         0 my %params = @_;
29              
30             # default port
31 0   0     0 $params{'port'} ||= 8080;
32              
33             # missing service, we'll create it ourselves
34 0 0       0 if ( ! defined $params{'service'} ) {
35             # but we need a host for that
36 0 0       0 defined $params{'host'}
37             or croak 'Must provide a service, or a host and port';
38              
39             # set it up
40             $params{'service'} =
41 0         0 sprintf 'http://%s:%d', @params{qw};
42             }
43              
44             my $http_tiny = HTTP::Tiny->new(
45 0 0       0 defined $params{'timeout'} ? ( timeout => $params{'timeout'} ) : (),
46             );
47              
48 0         0 my $strict_mode = 0;
49 0 0       0 if ($params{strict_mode}) {
50 0 0       0 if ($params{strict_mode} == 1) {
51 0         0 $strict_mode = $params{strict_mode};
52             }
53             else {
54 0         0 die "Invalid value. Strict mode can have only one of the following values: [undef, 0, 1]";
55             }
56             }
57              
58             # we only care about the service, and we assured it exists
59             return bless {
60 0         0 service => $params{'service'},
61             http_tiny => $http_tiny,
62             strict_mode => $strict_mode,
63             }, $class;
64             }
65              
66              
67             ###################
68             # Instance Methods
69             #
70              
71             # -------------------------------------------------------------------------
72             #
73             # list of tables
74             #
75             sub list {
76 0     0 1 0 my $self = shift;
77              
78 0         0 my $uri = $self->{service} . $INFO_ROUTES{list};
79              
80 0         0 my $rs = $self->{http_tiny}->get($uri, {
81             headers => {
82             'Accept' => 'application/json',
83             }
84             });
85              
86 0 0       0 return if $self->_handle_error( $uri, $rs );
87              
88 0         0 my $response = decode_json($rs->{content});
89              
90 0         0 my @tables = ();
91 0         0 foreach my $table (@{$response->{table}}) {
  0         0  
92 0         0 my $table_name = $table->{name};
93 0         0 push @tables, {name => $table_name};
94             }
95              
96 0         0 return \@tables;
97             }
98              
99             # -------------------------------------------------------------------------
100             #
101             # get hbase rest version info
102             #
103             sub version {
104 0     0 1 0 my $self = shift;
105              
106 0         0 my $uri = $self->{service} . $INFO_ROUTES{version};
107              
108 0         0 my $rs = $self->{http_tiny}->get($uri, {
109             headers => {
110             'Accept' => 'application/json',
111             }
112             });
113              
114 0 0       0 return if $self->_handle_error( $uri, $rs );
115              
116 0         0 my $response = decode_json($rs->{content});
117              
118 0         0 return $response;
119             }
120              
121             # -------------------------------------------------------------------------
122             #
123             # get
124             #
125             # usage:
126             #
127             # my $records = $hbase->get({
128             # table => $table_name,
129             # where => {
130             # key_equals => $key
131             # },
132             # columns => [
133             # 'd:some_column_name',
134             # 'd:some_other_column_name'
135             # ],
136             # versions => 100,
137             # timestamp_range => {
138             # from => $timestamp_from,
139             # until => $timestamp_until,
140             # }
141             # });
142             #
143             sub get {
144 0     0 1 0 my $self = shift;
145 0         0 my $params = shift;
146              
147 0         0 my $get_urls = _build_get_uri($params);
148              
149 0         0 my @result = ();
150 0         0 foreach my $url (@$get_urls) {
151              
152 0 0       0 if ( my $rows = $self->_get_tiny( $url->{url} ) ){
153              
154 0         0 push @result, @$rows;
155              
156             } else {
157              
158             # we allow for some keys to be missing but fail on other errors
159 0 0 0     0 return unless $self->{last_error} && ( $self->{last_error}->{type} // '' ) eq '404';
      0        
160              
161             }
162              
163             }
164              
165 0         0 return \@result;
166             }
167              
168             # _get_tiny
169             sub _get_tiny {
170              
171 0     0   0 my $self = shift;
172 0         0 my $uri = shift;
173              
174 0         0 my $url = $self->{service} . $uri;
175              
176 0         0 my $rs = $self->{http_tiny}->get($url, {
177             headers => {
178             'Accept' => 'application/json',
179             'Accept-Encoding' => 'gzip',
180             }
181             });
182              
183 0 0       0 return if $self->_handle_error( $uri, $rs, [ '404' ] );
184              
185 0         0 _maybe_decompress( $rs );
186 0         0 my $response = decode_json( $rs->{content} );
187              
188 0         0 my @rows = ();
189 0         0 foreach my $row (@{$response->{Row}}) {
  0         0  
190              
191 0         0 my $key = decode_base64($row->{key});
192 0         0 my @cols = ();
193              
194 0         0 foreach my $c (@{$row->{Cell}}) {
  0         0  
195 0         0 my $name = decode_base64($c->{column});
196 0         0 my $value = decode_base64($c->{'$'});
197 0         0 my $ts = $c->{timestamp};
198 0         0 push @cols, {name => $name, value => $value, timestamp => $ts};
199             }
200 0         0 push @rows, {row => $key, columns => \@cols};
201             }
202              
203 0         0 return \@rows;
204             }
205              
206             # -------------------------------------------------------------------------
207             #
208             # multiget
209             #
210             # usage:
211             #
212             # my $records = $hbase->multiget({
213             # table => $table_name,
214             # where => {
215             # key_in => \@keys
216             # },
217             # versions => $number_of_versions,
218             # });
219             #
220             sub multiget {
221 0     0 1 0 my $self = shift;
222 0         0 my $query = shift;
223              
224 0         0 my $where = $query->{where};
225 0 0 0     0 unless ($where->{key_in} && @{$where->{key_in}}) {
  0         0  
226             $self->{last_error} = {
227 0         0 type => "Bad request",
228             info => "No keys specified for multiget.",
229             uri => "Could not counstruct uri - no keys provided.",
230             };
231 0         0 return;
232             }
233              
234 0         0 my $multiget_urls = _build_multiget_uri($query);
235              
236 0         0 my @result = ();
237              
238 0         0 foreach my $url (@$multiget_urls) {
239              
240 0 0       0 if ( my $rows = $self->_multiget_tiny( $url->{url} ) ){
241              
242 0         0 push @result, @$rows;
243              
244             } else {
245              
246             # we allow for some keys to be missing but fail on other errors
247 0 0 0     0 return unless $self->{last_error} && ( $self->{last_error}->{type} // '' ) eq '404';
      0        
248              
249             }
250              
251             }
252              
253 0         0 return \@result;
254             }
255              
256             # -------------------------------------------------------------------------
257             #
258             # _multiget_tiny
259             #
260             sub _multiget_tiny {
261              
262 0     0   0 my $self = shift; # hbh
263 0         0 my $uri = shift;
264              
265 0         0 my $url = $self->{service} . $uri;
266              
267 0         0 my $data_format = 'application/json';
268              
269 0         0 my $rs = $self->{http_tiny}->get($url, {
270             headers => {
271             'Accept' => $data_format,
272             'Accept-Encoding' => 'gzip',
273             }
274             });
275              
276             # allow items to be missing for multiget
277 0 0       0 return if $self->_handle_error( $uri, $rs, [ '404' ] );
278              
279 0         0 _maybe_decompress( $rs );
280 0         0 my $response = decode_json($rs->{content});
281              
282 0         0 my @rows = ();
283 0         0 foreach my $row (@{$response->{Row}}) {
  0         0  
284              
285 0         0 my $key = decode_base64($row->{key});
286 0         0 my @cols = ();
287              
288 0         0 foreach my $c (@{$row->{Cell}}) {
  0         0  
289 0         0 my $name = decode_base64($c->{column});
290 0         0 my $value = decode_base64($c->{'$'});
291 0         0 my $ts = $c->{timestamp};
292 0         0 push @cols, {name => $name, value => $value, timestamp => $ts};
293             }
294 0         0 push @rows, {row => $key, columns => \@cols};
295             }
296              
297 0         0 return \@rows;
298             }
299              
300             # -------------------------------------------------------------------------
301             #
302             # put:
303             #
304             # IN: HASH => {
305             # table => $table,
306             # changes => [ # array of hashes, where each hash is one row
307             # ...,
308             # {
309             # row_key => "$row_key",
310             # row_cells => [
311             # {
312             # column => "$family:$name",
313             # value => "$value",
314             # timestamp => $timestamp # <- optional (override HBase timestamp)
315             # },
316             # ...,
317             # { column => "$family:$name", value => "$value" },
318             # ],
319             # },
320             # ...
321             # ]
322             # }
323             #
324             # OUT: result flag
325             sub put {
326 0     0 1 0 my $self = shift;
327 0         0 my $command = shift;
328              
329             # at least one valid record
330 0 0 0     0 unless ($command->{table} &&
      0        
331             (defined $command->{changes}->[0]->{row_key}) &&
332             $command->{changes}->[0]->{row_cells}) {
333 0         0 die q/Must provide required parameters:
334             IN: HASH => {
335             table => $table,
336             changes => [
337             ...,
338             {
339             row_key => "$row_key",
340             row_cells => [
341             { column => 'family:name', value => 'value' },
342             ...
343             { column => 'family:name', value => 'value' },
344             ],
345             },
346             ...
347             ]
348             };
349             /;
350             }
351              
352 0         0 my $table = $command->{table};
353              
354             # build JSON:
355 0         0 my $JSON_Command .= '{"Row":[';
356 0         0 my @sorted_json_row_changes = ();
357 0         0 foreach my $row_change (@{$command->{changes}}) {
  0         0  
358              
359 0         0 my $row_cell_changes = $row_change->{row_cells};
360              
361 0         0 my $rows = [];
362 0         0 my $row_change_formated = { Row => $rows };
363 0         0 my $row_cell_changes_formated = {};
364              
365             # hbase wants keys in sorted order; it wont work otherwise;
366             # more specificaly, the special key '$' has to be at the end;
367             my $sorted_json_row_change =
368             q|{"key":"|
369 0         0 . encode_base64($row_change->{row_key}, '')
370             . q|","Cell":[|
371             ;
372              
373 0         0 my @sorted_json_cell_changes = ();
374 0         0 foreach my $cell_change (@$row_cell_changes) {
375              
376             # timestamp override
377 0         0 my $ts;
378 0         0 my $overide_timestamp = undef;
379 0 0       0 if ($cell_change->{timestamp}) {
380 0         0 $overide_timestamp = 1;
381 0         0 $ts = $cell_change->{timestamp};
382             }
383              
384 0 0       0 my $timestamp_override_string = $overide_timestamp
385             ? '"timestamp":"' . $ts . '",'
386             : ''
387             ;
388              
389             my $sorted_json_cell_change =
390             '{'
391             . $timestamp_override_string
392             . '"column":"'
393             . encode_base64($cell_change->{column}, '')
394             . '",'
395             . '"$":"'
396 0         0 . encode_base64($cell_change->{value}, '')
397             . '"}'
398             ;
399              
400 0         0 push @sorted_json_cell_changes, $sorted_json_cell_change;
401              
402             } # next Cell
403              
404 0         0 $sorted_json_row_change .= join(",", @sorted_json_cell_changes);
405 0         0 $sorted_json_row_change .= ']}';
406              
407 0         0 push @sorted_json_row_changes, $sorted_json_row_change;
408              
409             } # next Row
410              
411 0         0 $JSON_Command .= join(",", @sorted_json_row_changes);
412 0         0 $JSON_Command .= ']}';
413              
414 0         0 my $route = '/' . uri_escape($table) . '/false-row-key';
415 0         0 my $uri = $self->{service} . $route;
416              
417 0         0 my $rs = $self->{http_tiny}->request('PUT', $uri, {
418             content => $JSON_Command,
419             headers => {
420             'Accept' => 'application/json',
421             'content-type' => 'application/json'
422             },
423             });
424              
425 0         0 return !$self->_handle_error( $uri, $rs );
426              
427             }
428              
429             # =========================================================================
430             # delete: delete an entire record or selected columns of it
431             #
432             # Usage:
433             # my $success = $hbh->delete({
434             # table => 'table',
435             # key => 'key',
436             # family => 'family', # optional, unless column is given
437             # column => 'column', # optional
438             # })
439             sub delete {
440 0     0 1 0 my ($self, $attr) = @_;
441 0         0 my $table = delete $attr->{table};
442 0         0 my $key = delete $attr->{key};
443 0         0 my $family = delete $attr->{family};
444 0         0 my $column = delete $attr->{column};
445 0         0 my ($route, $rs, $url);
446              
447 0 0       0 die "Table name required" if(!$table);
448 0 0       0 die "Row key required" if(!$key);
449 0 0 0     0 die "Family is required if column is given" if($column && !$family);
450              
451 0 0       0 $key = join(';', @$key) if(ref($key) eq 'ARRAY');
452 0         0 $route = sprintf("/%s/%s", $table, uri_escape($key));
453              
454 0 0       0 if($family) {
455 0         0 $route .= sprintf("/%s", $family);
456 0 0       0 $route .= sprintf(":%s", $column) if($column);
457             }
458              
459 0         0 $url = sprintf("%s%s", $self->{service}, $route);
460 0         0 $rs = $self->{http_tiny}->delete($url, {
461             headers => {
462             'Accept' => 'application/json',
463             }
464             });
465              
466 0         0 return !$self->_handle_error( $url, $rs );
467              
468             }
469              
470             # -------------------------------------------------------------------------
471             # build get uri
472             #
473             sub _build_get_uri {
474 7     7   501 my $query = shift;
475              
476 7         6 my $table = $query->{table};
477              
478 7         7 my $timestamp_url_part = undef;
479             # timestamp range query is supported only if columns are specifed
480 7 100 66     17 if ($query->{columns} and @{$query->{columns}}) {
  4         13  
481 4 100 66     16 if ( $query->{timestamp_range} and %{ $query->{timestamp_range} } ) {
  1         6  
482 1         2 my $timestamp_from = $query->{timestamp_range}->{from};
483 1         1 my $timestamp_until = $query->{timestamp_range}->{until};
484 1         3 $timestamp_url_part = "/" . $timestamp_from . "," . $timestamp_until;
485             }
486             }
487              
488 7         6 my $versions_url_part = undef;
489 7 100       14 if ( $query->{versions} ) {
490 4         5 my $versions = $query->{versions};
491 4         7 $versions_url_part = "?v=$versions";
492             }
493              
494 7         5 my $uri;
495 7 50       12 if ($query->{where}->{key_equals}) {
496 7         4 my $key = $query->{where}->{key_equals};
497 7         34 $uri = '/' . $table . '/' . uri_escape($key);
498             }
499             else {
500 0         0 my $part_of_key = $query->{where}->{key_begins_with};
501 0         0 $uri = '/' . $table . '/' . uri_escape($part_of_key . '*');
502             }
503              
504 7         78 my @get_urls = ();
505 7 100 66     17 if ( $query->{columns} and @{$query->{columns}} ) {
  4         9  
506 4         3 my $current_url = undef;
507 4         2 foreach my $column ( @{$query->{columns}} ) {
  4         9  
508 11 100       91 if (! defined $current_url) {
509 4   33     11 $current_url ||= $uri . "/" . uri_escape($column);
510             }
511             else{
512 7         14 my $next_url = $current_url . ',' . uri_escape($column);
513 7 100       75 if (length($next_url) < 1500) {
514 4         5 $current_url = $next_url;
515             }
516             else {
517 3         5 push @get_urls, { url => $current_url, len => length($current_url) };
518 3         6 $current_url = $uri . "/" . uri_escape($column);
519             }
520             }
521             }
522             # last batch
523 4         18 push @get_urls, { url => $current_url, len => length($current_url) };
524             } else {
525 3         6 push @get_urls, { url => $uri, len => length($uri) };
526             }
527              
528 7 100 100     25 if ( $timestamp_url_part || $versions_url_part ) {
529 4         4 foreach my $get_url (@get_urls) {
530 4 100       7 $get_url->{url} .= $timestamp_url_part if $timestamp_url_part;
531 4 50       10 $get_url->{url} .= $versions_url_part if $versions_url_part;
532             }
533             }
534              
535 7         30 return \@get_urls;
536             }
537              
538             # -------------------------------------------------------------------------
539             # build multiget url
540             #
541             sub _build_multiget_uri {
542 3     3   4 my $query = shift;
543              
544 3         4 my $keys = $query->{where}->{key_in};
545 3         3 my $table = $query->{table};
546              
547 3         6 my $uri_base = '/' . $table . '/multiget?';
548              
549 3         3 my @multiget_urls = ();
550 3         2 my $current_url = undef;
551 3         5 foreach my $key (@$keys) {
552 15 100       98 if (! defined $current_url) {
553 3   33     14 $current_url ||= $uri_base . "row=" . uri_escape($key);
554             }
555             else{
556 12         30 my $next_url = $current_url . '&row=' . uri_escape($key);
557 12 100       117 if (length($next_url) < 2000) {
558 9         10 $current_url = $next_url;
559             }
560             else {
561 3         7 push @multiget_urls, { url => $current_url, len => length($current_url) };
562 3         7 $current_url = $uri_base . "row=" . uri_escape($key);
563             }
564             }
565             }
566             # last batch
567 3         19 push @multiget_urls, { url => $current_url, len => length($current_url) };
568              
569 3 100       6 if ($query->{versions}) {
570 1         2 foreach my $mget_url (@multiget_urls) {
571 1         2 my $versions = $query->{versions};
572 1         2 my $versions_url_part = "v=$versions";
573              
574 1         2 $mget_url->{url} .= '&' . $versions_url_part;
575             }
576             }
577              
578 3         11 return \@multiget_urls;
579             }
580              
581             # -------------------------------------------------------------------------
582             # Handles the error response:
583             # 1) Replaces $self->{last_error} with a one parsed from the response (that can be undef)
584             # 2) If there is a error, returns true in non-strict mode. In strict mode dies on error unless
585             # its type is given not-to-die otherwise returns true
586             #
587             sub _handle_error {
588              
589 0     0     my ( $self, $uri, $response, $not_to_die ) = @_;
590              
591 0 0         if ( my $error = $self->{last_error} = _extract_error_tiny($uri, $response) ) {
592              
593 0 0         if ( $self->{strict_mode} ) {
594              
595             die "request error: " . Dumper( $error ) unless $error->{type}
596             and $not_to_die
597 0 0 0       and grep { $_ eq $error->{type} } @$not_to_die ;
  0   0        
598              
599             }
600              
601 0           return 1;
602             }
603              
604             }
605              
606             # -------------------------------------------------------------------------
607             # parse error
608             #
609             sub _extract_error_tiny {
610              
611 0     0     my $uri = shift;
612 0           my $res = shift;
613              
614 0 0         return if $res->{success};
615              
616 0           my $detailed_error_info = {reason => $res->{reason}, content => $res->{content}, status => $res->{status}};
617              
618 0 0         if ( my $http_status = $res->{status} ){
619              
620 0 0         if ( $http_status == 404 ) {
    0          
    0          
621             return {
622 0           type => '404',
623             info => 'A subset of keys you\'ve requested was not found. Or: no data has been written, if you were writing',
624             guess => 'Non-existing table, subset of keys or an exceeded quota?', #at the time of this writing, HBase REST servers send you a 404 when you're over quota, reading. This requires a fix on HBase side, no way to work around this here.
625             uri => $uri
626             };
627             } elsif ( $http_status == 599 ) {
628              
629             return {
630 0           type => '599',
631             info => 'Timeout',
632             uri => $uri,
633             error_details => $detailed_error_info
634             };
635              
636             } elsif ( $http_status == 503 ){
637              
638 0           my @lines = split /\n/, $res->{content};
639              
640 0           foreach my $line (@lines)
641             {
642 0 0         if (index($line, 'ThrottlingException') != -1)
643             {
644             # TODO: type => 404 above, how do we make it consistent?
645 0           return {type => '503', exception => 'QuotaExceededException', info => $line, uri => $uri, error_details => $detailed_error_info};
646             }
647             }
648              
649             }
650              
651             } else {
652              
653             return {
654 0           type => 'Unknown',
655             info => 'No status in response',
656             uri => $uri,
657             http_response => $res,
658             };
659              
660             }
661              
662 0           my $msg;
663 0 0         if ($res->{reason}) {
664 0           $msg = $res->{reason};
665             }
666             else {
667             return {
668 0           type => "Bad response",
669             info => "No reason in the response",
670             uri => $uri,
671             error_details => $detailed_error_info
672             };
673             }
674              
675 0           my ($exception, $info) = $msg =~ m{\.([^\.]+):(.*)$};
676 0 0         if ($exception) {
677 0           $exception =~ s{Exception$}{};
678             } else {
679 0           $exception = 'Unknown';
680 0   0       $info = $msg || $res->{status} || Dumper($res);
681             }
682 0           return { type => $exception, info => $info, uri => $uri, error_details => $detailed_error_info };
683             }
684              
685             sub _maybe_decompress {
686 0     0     my $rs = shift;
687              
688 0 0 0       if ( exists $rs->{headers}
      0        
689             && exists $rs->{ headers }->{ 'content-encoding' }
690             && $rs->{ headers }->{ 'content-encoding' } eq 'gzip' ) {
691 0           my $content = $rs->{content};
692 0           my ( $content_decompressed, $scalar, $GunzipError );
693 0 0         gunzip \$content => \$content_decompressed,
694             MultiStream => 1, Append => 1, TrailingData => \$scalar
695             or die "gunzip failed: $GunzipError\n";
696              
697 0           $rs->{content} = $content_decompressed;
698             }
699             }
700              
701             1;
702              
703             __END__