File Coverage

blib/lib/HBase/JSONRest/Scanner.pm
Criterion Covered Total %
statement 31 133 23.3
branch 2 52 3.8
condition 8 72 11.1
subroutine 6 11 54.5
pod 2 2 100.0
total 49 270 18.1


line stmt bran cond sub pod time code
1             package HBase::JSONRest::Scanner;
2              
3 1     1   407 use strict;
  1         2  
  1         21  
4 1     1   3 use warnings;
  1         1  
  1         18  
5              
6 1     1   3 use URI::Escape;
  1         1  
  1         45  
7 1     1   4 use Time::HiRes qw(time);
  1         1  
  1         7  
8 1     1   82 use Data::Dumper;
  1         1  
  1         902  
9              
10             # new
11             sub new {
12 0     0 1 0 my $class = shift;
13 0         0 my $params = shift;
14              
15             die "HBase handle required!"
16 0 0 0     0 unless ($params->{hbase} and (ref $params->{hbase}));
17              
18 0         0 my $hbase = $params->{hbase};
19              
20 0   0     0 my $limit = $params->{atatime} || 1;
21              
22             my $self = {
23             hbase => $hbase,
24              
25             table => $params->{table},
26              
27             startrow => $params->{startrow},
28              
29             endrow => $params->{endrow},
30              
31             endtime => $params->{endtime},
32              
33             prefix => $params->{prefix},
34              
35 0         0 limit => $limit,
36              
37             last_key_from_previous_batch => undef,
38              
39             batch_no => 0,
40              
41             EOF => 0,
42             };
43              
44 0         0 return bless $self, $class;
45             }
46              
47             # get_next_batch
48             sub get_next_batch {
49              
50 0     0 1 0 my $self = shift;
51              
52 0         0 $self->{_last_batch_time_start} = time;
53              
54 0         0 my $table = $self->{table};
55 0         0 my $prefix = $self->{prefix};
56 0         0 my $limit = $self->{limit};
57 0         0 my $hbase = $self->{hbase};
58              
59 0         0 my $last_key_from_previous_batch;
60              
61             # Three ways of scanning are supported:
62             #
63             # I. Provide a prefix and scan all rows with that prefix
64             # II. Provide startrow and endrow. Scan is inclusive for
65             # startrow and exclusive for endrow.
66             # III. Provide just startrow - scan entire table, batch by batch.
67             #
68             # All of these are converted to startrow and end_condition under
69             # the hood. Difference is only in user API.
70              
71             # First Batch
72 0 0       0 if ($self->{batch_no} == 0) {
73              
74             # Case I:
75 0 0 0     0 if ((defined $prefix) && !$self->{startrow} && !$self->{endrow}) {
    0 0        
    0 0        
    0 0        
    0 0        
      0        
      0        
      0        
      0        
76              
77 0         0 my $first_row = $self->_get_first_row_of_prefix();
78              
79             # no rows for specified prefix
80 0 0 0     0 return undef if (!$first_row && !$first_row->{row});
81              
82 0         0 $self->{startrow} = $first_row->{row};
83 0         0 $self->{end_condition_type} = 'PREFIX';
84             }
85             # Case II:
86             # case no prefix, startrow exists, endrow exists
87             elsif ((!defined $prefix) && $self->{startrow} && $self->{endrow}){
88             # $self->{startrow} allready assigned
89 0         0 $self->{end_condition_type} = 'ENDROW';
90             }
91             # Case III:
92             # only firs_key specified, scan untill the end of the table
93             elsif ((!defined $prefix) && $self->{startrow} && !$self->{endrow}){
94             # $self->{startrow} allready assigned
95 0         0 $self->{end_condition_type} = 'NONE';
96             }
97             # Forbiden cases:
98             # case prefix and startrow/endrow
99             elsif ((defined $prefix) && ($self->{startrow} || $self->{endrow})){
100 0         0 die "Can not use prefix and startrow/endrow at the same time!";
101             }
102             # case no params
103             elsif ((!defined $prefix) && !$self->{startrow}) {
104 0         0 die "Must specify either prefix or startrow!";
105             }
106             else {
107 0         0 die "Unknown query case!";
108             }
109              
110             # SCAN FOR FIRST BATCH
111             my $rows = $self->_scan_raw({
112             table => $self->{table},
113             startrow => $self->{startrow}, # <- inclusive
114 0         0 limit => $limit,
115             });
116 0         0 $self->{last_batch_time} = time - $self->{_last_batch_time_start};
117 0         0 $self->{batch_no}++;
118              
119 0 0       0 if (!$hbase->{last_error}) {
120              
121 0 0 0     0 if ($rows && @$rows) {
122              
123 0         0 $self->_filter_rows_beyond_last_key($rows);
124              
125             # return what is left, if something is left after filter
126 0 0 0     0 if ($rows && @$rows) {
127 0         0 $self->{last_key_from_previous_batch} = $rows->[-1]->{row};
128 0         0 return $rows;
129             }
130             else {
131 0         0 $self->{last_key_from_previous_batch} = undef;
132 0         0 $self->{EOF} = 1;
133 0         0 return [];
134             }
135             }
136             else {
137 0         0 $self->{last_key_from_previous_batch} = undef;
138 0         0 $self->{EOF} = 1;
139 0         0 return [];
140             }
141             }
142             else {
143 0         0 die "Error while trying to get the first key of a prefix!" . Dumper($hbase->{last_error});
144             }
145             }
146             # Next Batch
147             else {
148             # no more records, last batch was empty or it was the last batch
149 0 0 0     0 if (!$self->{last_key_from_previous_batch} || $self->{EOF}) {
150 0         0 return undef;
151             }
152              
153 0         0 $last_key_from_previous_batch = $self->{last_key_from_previous_batch};
154 0         0 $self->{last_key_from_previous_batch} = undef;
155              
156             # Use last row from previous batch as start row for the next scan, but
157             # make an exclude-start-row scan type.
158              
159 0         0 my $next_batch = $self->_scan_raw({
160             table => $table,
161             startrow => $last_key_from_previous_batch,
162             exclude_startrow_from_result => 1,
163             limit => $limit,
164             });
165              
166 0         0 $self->{last_batch_time} = time - $self->{_last_batch_time_start};
167 0         0 $self->{batch_no}++;
168              
169 0 0       0 if (!$hbase->{last_error}) {
170              
171 0 0 0     0 if ($next_batch && @$next_batch) {
172              
173 0         0 $self->_filter_rows_beyond_last_key($next_batch);
174              
175             # return what is left, if something is left after filter
176 0 0 0     0 if ($next_batch && @$next_batch) {
177 0         0 $self->{last_key_from_previous_batch} = $next_batch->[-1]->{row};
178 0         0 return $next_batch;
179             }
180             else {
181 0         0 $self->{last_key_from_previous_batch} = undef;
182 0         0 $self->{EOF} = 1;
183 0         0 return [];
184             }
185             }
186             else {
187 0         0 $self->{last_key_from_previous_batch} = undef;
188 0         0 $self->{EOF} = 1;
189 0         0 return [];
190             }
191             }
192             else {
193             die "Scanner error while trying to get next batch!"
194 0         0 . Dumper($hbase->{last_error});
195             }
196             }
197             }
198              
199             # _get_first_row_of_prefix
200             sub _get_first_row_of_prefix {
201 0     0   0 my $self = shift;
202              
203 0         0 my $prefix = $self->{prefix};
204 0         0 my $hbase = $self->{hbase};
205 0         0 my $table = $self->{table};
206              
207             # use prefix as the first row with limit 1 - returns the first row with given prefix
208 0         0 my $rows = $self->_scan_raw({
209             table => $table,
210             startrow => $prefix,
211             limit => 1,
212             });
213              
214 0 0       0 die "Should be only one first row!"
215             if ( scalar @$rows > 1);
216              
217 0 0       0 return undef unless $rows->[0];
218              
219 0         0 my $first_row = $rows->[0];
220              
221 0         0 return $first_row;
222             }
223              
224             # _scan_raw (uses passed paremeters instead of instance parameters)
225             sub _scan_raw {
226 0     0   0 my $self = shift;
227 0         0 my $params = shift;
228              
229 0         0 my $hbase = $self->{hbase};
230 0         0 $hbase->{last_error} = undef;
231              
232 0         0 $params->{endtime} = $self->{endtime};
233              
234 0         0 my $scan_uri = _build_scan_uri($params);
235              
236 0         0 my $rows = $hbase->_get_tiny($scan_uri);
237              
238 0         0 return $rows;
239             }
240              
241             sub _build_scan_uri {
242 1     1   1 my $params = shift;
243              
244             #
245             # request parameters:
246             #
247             # 1. startrow - The start row for the scan.
248             # 2. endrow - The end row for the scan.
249             # 4. starttime, endtime - To only retrieve columns within a specific range of version timestamps, both start and end time must be specified.
250             # 5. maxversions - To limit the number of versions of each column to be returned.
251             # 6. limit - The number of rows to return in the scan operation.
252              
253 1         2 my $table = $params->{table};
254 1   50     7 my $limit = $params->{limit} || 1;
255              
256             # optional
257 1   50     3 my $startrow = $params->{startrow} || "";
258 1   50     4 my $endrow = $params->{endrow} || "";
259 1   50     5 my $endtime = $params->{endtime} || "";
260              
261             # not supported yet:
262 1   50     7 my $columns = $params->{columns} || "";
263 1   50     6 my $starttime = $params->{starttime} || "";
264 1   50     4 my $maxversions = $params->{maxversions} || "";
265              
266             # option to do scans with exclusion of first row. Usefull when
267             # scanning for the next batch based on the last key from previous
268             # batch. By default this option is false.
269 1   50     3 my $exclude_startrow = $params->{exclude_startrow_from_result} || 0;
270              
271 1         1 my $uri;
272              
273 1 50       3 if ($exclude_startrow) {
274 0         0 $startrow = uri_escape($startrow) . uri_escape(chr(0));
275             }
276             else {
277 1         3 $startrow = uri_escape($startrow);
278             }
279 1         24 $uri
280             = "/"
281             . uri_escape($table)
282             . "/"
283             . '*?'
284             . "startrow=" . $startrow
285             . "&limit=" . $limit
286             ;
287              
288 1 50       13 $uri .= "&endtime=" . $endtime if $endtime;
289              
290 1         3 return $uri;
291             }
292              
293             sub _filter_rows_beyond_last_key {
294 0     0     my $self = shift;
295 0           my $rows = shift;
296              
297 0           my $last_retrieved_row = $rows->[-1]->{row};
298              
299 0 0         if ($self->{end_condition_type} eq 'PREFIX') {
    0          
    0          
300 0           my $prefix_end = $self->{prefix} . chr(255);
301 0 0         if ($last_retrieved_row gt $prefix_end) {
302             # need to filter out surpluss of rows
303 0           @$rows = grep { $_->{row} le $prefix_end } @$rows;
  0            
304             # also mark EOF
305 0           $self->{EOF} = 1;
306 0 0 0       if ($rows && @$rows) {
307 0           my $last_retrieved_valid_row = $rows->[-1]->{row};
308 0           $self->{last_key_from_previous_batch} = $last_retrieved_valid_row;
309             }
310 0           return;
311             }
312             }
313             elsif ($self->{end_condition_type} eq 'ENDROW') {
314 0 0         if ($last_retrieved_row ge $self->{endrow}) {
315             # need to filter out surpluss of rows
316 0           @$rows = grep { $_->{row} lt $self->{endrow} } @$rows;
  0            
317             # also mark EOF
318 0           $self->{EOF} = 1;
319 0 0 0       if ($rows && @$rows) {
320 0           my $last_retrieved_valid_row = $rows->[-1]->{row};
321 0           $self->{last_key_from_previous_batch} = $last_retrieved_valid_row;
322             }
323 0           return;
324             }
325             }
326             elsif ($self->{end_condition_type} eq 'NONE') {
327 0           return;
328             }
329             else {
330 0           die 'Unknown end_condition_type!';
331             }
332             }
333              
334             1;
335              
336             __END__