File Coverage

blib/lib/HBase/JSONRest/Scanner.pm
Criterion Covered Total %
statement 33 134 24.6
branch 4 56 7.1
condition 5 66 7.5
subroutine 6 11 54.5
pod 2 2 100.0
total 50 269 18.5


line stmt bran cond sub pod time code
1             package HBase::JSONRest::Scanner;
2              
3 1     1   478 use strict;
  1         3  
  1         27  
4 1     1   4 use warnings;
  1         2  
  1         32  
5              
6 1     1   7 use URI::Escape;
  1         3  
  1         74  
7 1     1   8 use Time::HiRes qw(time);
  1         3  
  1         10  
8 1     1   120 use Data::Dumper;
  1         3  
  1         1018  
9              
10             # new
11             sub new {
12 0     0 1 0 my $class = shift;
13 0         0 my $params = shift;
14              
15             die "HBase handle required!"
16 0 0 0     0 unless ($params->{hbase} and (ref $params->{hbase}));
17              
18 0         0 my $hbase = $params->{hbase};
19              
20 0   0     0 my $limit = $params->{atatime} || 1;
21              
22             my $self = {
23             hbase => $hbase,
24              
25             table => $params->{table},
26              
27             startrow => $params->{startrow},
28              
29             endrow => $params->{endrow},
30              
31             starttime => $params->{starttime}, # server's default is 0
32              
33             endtime => $params->{endtime}, # server's default is Long.MAX_VALUE
34              
35             maxversions => $params->{maxversions}, # server's default is 1
36              
37             prefix => $params->{prefix},
38              
39 0         0 limit => $limit,
40              
41             last_key_from_previous_batch => undef,
42              
43             batch_no => 0,
44              
45             EOF => 0,
46             };
47              
48 0         0 return bless $self, $class;
49             }
50              
51             # get_next_batch
52             sub get_next_batch {
53              
54 0     0 1 0 my $self = shift;
55              
56 0         0 $self->{_last_batch_time_start} = time;
57              
58 0         0 my $table = $self->{table};
59 0         0 my $prefix = $self->{prefix};
60 0         0 my $limit = $self->{limit};
61 0         0 my $hbase = $self->{hbase};
62              
63 0         0 my $last_key_from_previous_batch;
64              
65             # Three ways of scanning are supported:
66             #
67             # I. Provide a prefix and scan all rows with that prefix
68             # II. Provide startrow and endrow. Scan is inclusive for
69             # startrow and exclusive for endrow.
70             # III. Provide just startrow - scan entire table, batch by batch.
71             #
72             # All of these are converted to startrow and end_condition under
73             # the hood. Difference is only in user API.
74              
75             # First Batch
76 0 0       0 if ($self->{batch_no} == 0) {
77              
78             # Case I:
79 0 0 0     0 if ((defined $prefix) && !$self->{startrow} && !$self->{endrow}) {
    0 0        
    0 0        
    0 0        
    0 0        
      0        
      0        
      0        
      0        
80              
81 0         0 my $first_row = $self->_get_first_row_of_prefix();
82              
83             # no rows for specified prefix
84 0 0 0     0 return undef if (!$first_row && !$first_row->{row});
85              
86 0         0 $self->{startrow} = $first_row->{row};
87 0         0 $self->{end_condition_type} = 'PREFIX';
88             }
89             # Case II:
90             # case no prefix, startrow exists, endrow exists
91             elsif ((!defined $prefix) && $self->{startrow} && $self->{endrow}){
92             # $self->{startrow} allready assigned
93 0         0 $self->{end_condition_type} = 'ENDROW';
94             }
95             # Case III:
96             # only firs_key specified, scan untill the end of the table
97             elsif ((!defined $prefix) && $self->{startrow} && !$self->{endrow}){
98             # $self->{startrow} allready assigned
99 0         0 $self->{end_condition_type} = 'NONE';
100             }
101             # Forbiden cases:
102             # case prefix and startrow/endrow
103             elsif ((defined $prefix) && ($self->{startrow} || $self->{endrow})){
104 0         0 die "Can not use prefix and startrow/endrow at the same time!";
105             }
106             # case no params
107             elsif ((!defined $prefix) && !$self->{startrow}) {
108 0         0 die "Must specify either prefix or startrow!";
109             }
110             else {
111 0         0 die "Unknown query case!";
112             }
113              
114             # SCAN FOR FIRST BATCH
115             my $rows = $self->_scan_raw({
116             table => $self->{table},
117             startrow => $self->{startrow}, # <- inclusive
118             starttime => $self->{starttime},
119             endtime => $self->{endtime},
120             maxversions=> $self->{maxversions},
121 0         0 limit => $limit,
122             });
123 0         0 $self->{last_batch_time} = time - $self->{_last_batch_time_start};
124 0         0 $self->{batch_no}++;
125              
126 0 0       0 if (!$hbase->{last_error}) {
127              
128 0 0 0     0 if ($rows && @$rows) {
129              
130 0         0 $self->_filter_rows_beyond_last_key($rows);
131              
132             # return what is left, if something is left after filter
133 0 0 0     0 if ($rows && @$rows) {
134 0         0 $self->{last_key_from_previous_batch} = $rows->[-1]->{row};
135 0         0 return $rows;
136             }
137             else {
138 0         0 $self->{last_key_from_previous_batch} = undef;
139 0         0 $self->{EOF} = 1;
140 0         0 return [];
141             }
142             }
143             else {
144 0         0 $self->{last_key_from_previous_batch} = undef;
145 0         0 $self->{EOF} = 1;
146 0         0 return [];
147             }
148             }
149             else {
150 0         0 die "Error while trying to get the first key of a prefix!" . Dumper($hbase->{last_error});
151             }
152             }
153             # Next Batch
154             else {
155             # no more records, last batch was empty or it was the last batch
156 0 0 0     0 if (!$self->{last_key_from_previous_batch} || $self->{EOF}) {
157 0         0 return undef;
158             }
159              
160 0         0 $last_key_from_previous_batch = $self->{last_key_from_previous_batch};
161 0         0 $self->{last_key_from_previous_batch} = undef;
162              
163             # Use last row from previous batch as start row for the next scan, but
164             # make an exclude-start-row scan type.
165              
166             my $next_batch = $self->_scan_raw({
167             table => $table,
168             startrow => $last_key_from_previous_batch,
169             exclude_startrow_from_result => 1,
170             starttime => $self->{starttime},
171             endtime => $self->{endtime},
172             maxversions=> $self->{maxversions},
173 0         0 limit => $limit,
174             });
175              
176 0         0 $self->{last_batch_time} = time - $self->{_last_batch_time_start};
177 0         0 $self->{batch_no}++;
178              
179 0 0       0 if (!$hbase->{last_error}) {
180              
181 0 0 0     0 if ($next_batch && @$next_batch) {
182              
183 0         0 $self->_filter_rows_beyond_last_key($next_batch);
184              
185             # return what is left, if something is left after filter
186 0 0 0     0 if ($next_batch && @$next_batch) {
187 0         0 $self->{last_key_from_previous_batch} = $next_batch->[-1]->{row};
188 0         0 return $next_batch;
189             }
190             else {
191 0         0 $self->{last_key_from_previous_batch} = undef;
192 0         0 $self->{EOF} = 1;
193 0         0 return [];
194             }
195             }
196             else {
197 0         0 $self->{last_key_from_previous_batch} = undef;
198 0         0 $self->{EOF} = 1;
199 0         0 return [];
200             }
201             }
202             else {
203             die "Scanner error while trying to get next batch!"
204 0         0 . Dumper($hbase->{last_error});
205             }
206             }
207             }
208              
209             # _get_first_row_of_prefix
210             sub _get_first_row_of_prefix {
211 0     0   0 my $self = shift;
212              
213 0         0 my $prefix = $self->{prefix};
214 0         0 my $hbase = $self->{hbase};
215 0         0 my $table = $self->{table};
216              
217             # use prefix as the first row with limit 1 - returns the first row with given prefix
218 0         0 my $rows = $self->_scan_raw({
219             table => $table,
220             startrow => $prefix,
221             limit => 1,
222             });
223              
224 0 0       0 die "Should be only one first row!"
225             if ( scalar @$rows > 1);
226              
227 0 0       0 return undef unless $rows->[0];
228              
229 0         0 my $first_row = $rows->[0];
230              
231 0         0 return $first_row;
232             }
233              
234             # _scan_raw (uses passed paremeters instead of instance parameters)
235             sub _scan_raw {
236 0     0   0 my $self = shift;
237 0         0 my $params = shift;
238              
239 0         0 my $hbase = $self->{hbase};
240 0         0 $hbase->{last_error} = undef;
241              
242 0         0 my $scan_uri = _build_scan_uri($params);
243              
244 0         0 my $rows = $hbase->_get_tiny($scan_uri);
245              
246 0         0 return $rows;
247             }
248              
249             sub _build_scan_uri {
250 1     1   5 my $params = shift;
251              
252             #
253             # request parameters:
254             #
255             # 1. startrow - The start row for the scan.
256             # 2. endrow - The end row for the scan.
257             # 4. starttime, endtime - To only retrieve columns within a specific range of version timestamps, both start and end time must be specified.
258             # 5. maxversions - To limit the number of versions of each column to be returned.
259             # 6. limit - The number of rows to return in the scan operation.
260              
261 1         3 my $table = $params->{table};
262 1   50     6 my $limit = $params->{limit} || 1;
263              
264             # optional
265 1   50     6 my $startrow = $params->{startrow} || "";
266 1   50     8 my $endrow = $params->{endrow} || "";
267 1         4 my $starttime = $params->{starttime}; # server's default is 0
268 1         4 my $endtime = $params->{endtime}; # server's default is Long.MAX_VALUE
269 1         3 my $maxversions = $params->{maxversions}; # server's default is 1
270              
271             # not supported yet:
272 1   50     7 my $columns = $params->{columns} || "";
273              
274             # option to do scans with exclusion of first row. Usefull when
275             # scanning for the next batch based on the last key from previous
276             # batch. By default this option is false.
277 1   50     7 my $exclude_startrow = $params->{exclude_startrow_from_result} || 0;
278              
279 1         3 my $uri;
280              
281 1 50       5 if ($exclude_startrow) {
282 0         0 $startrow = uri_escape($startrow) . uri_escape(chr(0));
283             }
284             else {
285 1         8 $startrow = uri_escape($startrow);
286             }
287 1         58 $uri
288             = "/"
289             . uri_escape($table)
290             . "/"
291             . '*?'
292             . "startrow=" . $startrow
293             . "&limit=" . $limit
294             ;
295              
296 1 50       41 $uri .= "&starttime=" . $starttime if defined $starttime;
297 1 50       5 $uri .= "&endtime=" . $endtime if defined $endtime;
298 1 50       8 $uri .= "&maxversions=" . $maxversions if defined $maxversions;
299              
300 1         9 return $uri;
301             }
302              
303             sub _filter_rows_beyond_last_key {
304 0     0     my $self = shift;
305 0           my $rows = shift;
306              
307 0           my $last_retrieved_row = $rows->[-1]->{row};
308              
309 0 0         if ($self->{end_condition_type} eq 'PREFIX') {
    0          
    0          
310 0           my $prefix_end = $self->{prefix} . chr(255);
311 0 0         if ($last_retrieved_row gt $prefix_end) {
312             # need to filter out surpluss of rows
313 0           @$rows = grep { $_->{row} le $prefix_end } @$rows;
  0            
314             # also mark EOF
315 0           $self->{EOF} = 1;
316 0 0 0       if ($rows && @$rows) {
317 0           my $last_retrieved_valid_row = $rows->[-1]->{row};
318 0           $self->{last_key_from_previous_batch} = $last_retrieved_valid_row;
319             }
320 0           return;
321             }
322             }
323             elsif ($self->{end_condition_type} eq 'ENDROW') {
324 0 0         if ($last_retrieved_row ge $self->{endrow}) {
325             # need to filter out surpluss of rows
326 0           @$rows = grep { $_->{row} lt $self->{endrow} } @$rows;
  0            
327             # also mark EOF
328 0           $self->{EOF} = 1;
329 0 0 0       if ($rows && @$rows) {
330 0           my $last_retrieved_valid_row = $rows->[-1]->{row};
331 0           $self->{last_key_from_previous_batch} = $last_retrieved_valid_row;
332             }
333 0           return;
334             }
335             }
336             elsif ($self->{end_condition_type} eq 'NONE') {
337 0           return;
338             }
339             else {
340 0           die 'Unknown end_condition_type!';
341             }
342             }
343              
344             1;
345              
346             __END__