File Coverage

blib/lib/Elasticsearch/Scroll.pm
Criterion Covered Total %
statement 9 41 21.9
branch 0 12 0.0
condition 0 9 0.0
subroutine 3 8 37.5
pod 4 5 80.0
total 16 75 21.3


line stmt bran cond sub pod time code
1             package Elasticsearch::Scroll;
2             $Elasticsearch::Scroll::VERSION = '1.05';
3 1     1   175414 use Moo;
  1         186661  
  1         12  
4 1     1   2973 use Elasticsearch::Util qw(parse_params);
  1         4  
  1         10  
5 1     1   2249 use namespace::clean;
  1         36962  
  1         8  
6              
7             has '_buffer' => ( is => 'ro' );
8              
9             with 'Elasticsearch::Role::Is_Sync', 'Elasticsearch::Role::Scroll';
10              
11             #===================================
12             sub BUILDARGS {
13             #===================================
14 0     0 0   my ( $class, $params ) = parse_params(@_);
15 0           my $es = delete $params->{es};
16 0   0       my $scroll = $params->{scroll} ||= '1m';
17 0           my $results = $es->search($params);
18              
19 0           my $total = $results->{hits}{total};
20              
21             return {
22 0 0         es => $es,
23             scroll => $scroll,
24             aggregations => $results->{aggregations},
25             facets => $results->{facets},
26             suggest => $results->{suggest},
27             took => $results->{took},
28             total_took => $results->{took},
29             total => $total,
30             max_score => $results->{hits}{max_score},
31             _buffer => $results->{hits}{hits},
32             $total
33             ? ( _scroll_id => $results->{_scroll_id} )
34             : ( is_finished => 1 )
35             };
36             }
37              
38             #===================================
39             sub next {
40             #===================================
41 0     0 1   my ( $self, $n ) = @_;
42 0   0       $n ||= 1;
43 0   0       while ( $self->_has_scroll_id and $self->buffer_size < $n ) {
44 0           $self->refill_buffer;
45             }
46 0           my @return = splice( @{ $self->_buffer }, 0, $n );
  0            
47 0 0         $self->finish if @return < $n;
48 0 0         return wantarray ? @return : $return[-1];
49             }
50              
51             #===================================
52             sub drain_buffer {
53             #===================================
54 0     0 1   my $self = shift;
55 0           return splice( @{ $self->_buffer } );
  0            
56             }
57              
58             #===================================
59 0     0 1   sub buffer_size { 0 + @{ shift->_buffer } }
  0            
60             #===================================
61              
62             #===================================
63             sub refill_buffer {
64             #===================================
65 0     0 1   my $self = shift;
66              
67 0 0         return 0 if $self->is_finished;
68              
69 0           my $buffer = $self->_buffer;
70 0   0       my $scroll_id = $self->_scroll_id
71             || return 0 + @$buffer;
72              
73 0           my $results = $self->es->scroll(
74             scroll => $self->scroll,
75             scroll_id => $scroll_id,
76             );
77              
78 0           my $hits = $results->{hits}{hits};
79 0           $self->_set_total_took( $self->total_took + $results->{took} );
80              
81 0 0         if ( @$hits == 0 ) {
82 0           $self->_clear_scroll_id;
83             }
84             else {
85 0           $self->_set__scroll_id( $results->{_scroll_id} );
86 0           push @$buffer, @$hits;
87             }
88 0 0         $self->finish if @$buffer == 0;
89 0           return 0 + @$buffer;
90             }
91              
92             #===================================
93             around 'finish' => sub {
94             #===================================
95             my ( $orig, $self ) = @_;
96             $orig->($self);
97             @{ $self->_buffer } = ();
98             1;
99             };
100              
101             1;
102              
103             =pod
104              
105             =encoding UTF-8
106              
107             =head1 NAME
108              
109             Elasticsearch::Scroll - A helper module for scrolled searches
110              
111             =head1 VERSION
112              
113             version 1.05
114              
115             =head1 SYNOPSIS
116              
117             use Elasticsearch;
118             use Elasticsearch::Scroll;
119              
120             my $es = Elasticsearch->new;
121              
122             my $scroll = Elasticsearch::Scroll->new(
123             es => $es,
124             index => 'my_index',
125             search_type => 'scan',
126             size => 500
127             );
128              
129             say "Total hits: ". $scroll->total;
130              
131             while (my $doc = $scroll->next) {
132             # do something
133             }
134              
135             =head1 DESCRIPTION
136              
137             A I<scrolled search> is a search that allows you to keep pulling results
138             until there are no more matching results, much like a cursor in an SQL
139             database.
140              
141             Unlike paginating through results (with the C<from> parameter in
142             L<search()|Elasticsearch::Client::Direct/search()>),
143             scrolled searches take a snapshot of the current state of the index. Even
144             if you keep adding new documents to the index or updating existing documents,
145             a scrolled search will only see the index as it was when the search began.
146              
147             This module is a helper utility that wraps the functionality of the
148             L<search()|Elasticsearch::Client::Direct/search()> and
149             L<scroll()|Elasticsearch::Client::Direct/scroll()> methods to make
150             them easier to use.
151              
152             B<IMPORTANT>: Deep scrolling can be expensive. See L</DEEP SCROLLING>
153             for more.
154              
155             This class does L<Elasticsearch::Role::Scroll> and
156             L<Elasticsearch::Role::Is_Sync>.
157              
158             =head1 USE CASES
159              
160             There are two primary use cases:
161              
162             =head2 Pulling enough results
163              
164             Perhaps you want to group your results by some field, and you don't know
165             exactly how many results you will need in order to return 10 grouped
166             results. With a scrolled search you can keep pulling more results
167             until you have enough. For instance, you can search emails in a mailing
168             list, and return results grouped by C<thread_id>:
169              
170             my (%groups,@results);
171              
172             my $scroll = Elasticsearch::Scroll->new(
173             es => $es,
174             index => 'my_emails',
175             type => 'email',
176             body => { query => {... some query ... }}
177             );
178              
179             my $doc;
180             while (@results < 10 and $doc = $scroll->next) {
181              
182             my $thread = $doc->{_source}{thread_id};
183              
184             unless ($groups{$thread}) {
185             $groups{$thread} = [];
186             push @results, $groups{$thread};
187             }
188             push @{$groups{$thread}},$doc;
189              
190             }
191              
192             =head2 Extracting all documents
193              
194             Often you will want to extract all (or a subset of) documents in an index.
195             If you want to change your type mappings, you will need to reindex all of your
196             data. Or perhaps you want to move a subset of the data in one index into
197             a new dedicated index. In these cases, you don't care about sort
198             order, you just want to retrieve all documents which match a query, and do
199             something with them. For instance, to retrieve all the docs for a particular
200             C<client_id>:
201              
202             my $scroll = Elasticsearch::Scroll->new(
203             es => $es,
204             index => 'my_index',
205             search_type => 'scan', # important!
206             size => 500,
207             body => {
208             query => {
209             match => {
210             client_id => 123
211             }
212             }
213             }
214             );
215              
216             while my ( $doc = $scroll->next ) {
217             # do something
218             }
219              
220             Very often the I<something> that you will want to do with these results
221             involves bulk-indexing them into a new index. The easiest way to
222             marry a scrolled search with bulk indexing is to use the
223             L<Elasticsearch::Bulk/reindex()> method.
224              
225             =head1 DEEP SCROLLING
226              
227             Deep scrolling (and deep pagination) are very expensive in a distributed
228             environment, and the reason they are expensive is that results need to
229             be sorted in a global order.
230              
231             For example, if we have an index with 5 shards, and we request the first
232             10 results, each shard has to return its top 10, and then the I<requesting
233             node> (the node that is handling the search request) has to resort these
234             50 results to return a global top 10. Now, if we request page 1,000
235             (ie results 10,001 .. 10,010), then each shard has to return 10,010 results,
236             and the requesting node has to sort through 50,050 results just to return
237             10 of them!
238              
239             You can see how this can get very heavy very quickly. This is the reason that
240             web search engines never return more than 1,000 results.
241              
242             =head2 Disable sorting for efficient scrolling
243              
244             The problem with deep scrolling is the sorting phase. If we disable sorting,
245             then we can happily scroll through millions of documents efficiently. The
246             way to do this is to set C<search_type> to C<scan>:
247              
248             $scroll = Elasticsearch::Scroll->new(
249             es => $es,
250             search_type => 'scan',
251             size => 500,
252             );
253              
254             Scanning disables sorting and will just return C<size> results from each
255             shard until there are no more results to return. B<Note>: this means
256             that, when querying an index with 5 shards, the scrolled search
257             will pull C<size * 5> results at a time. If you have large documents or
258             are memory constrained, you will need to take this into account.
259              
260             =head1 METHODS
261              
262             =head2 C<new()>
263              
264             use Elasticsearch;
265             use Elasticsearch::Scroll;
266              
267             my $es = Elasticsearch->new(...);
268             my $scroll = Elasticsearch::Scroll->new(
269             es => $es, # required
270             scroll => '1m', # optional
271             %search_params
272             );
273              
274             The C<new()> method returns a new C<$scroll> object. You must pass your
275             Elasticsearch client as the C<es> argument, and you can specify
276             a C<scroll> duration (which defaults to C<"1m">). Any other parameters
277             are passed directly to L<Elasticsearch::Client::Direct/search()>.
278              
279             The C<scroll> duration tells Elasticearch how long it should keep the scroll
280             alive. B<Note>: this duration doesn't need to be long enough to process
281             all results, just long enough to process a single B<batch> of results.
282             The expiry gets renewed for another C<scroll> period every time new
283             a new batch of results is retrieved from the cluster.
284              
285             =head2 C<next()>
286              
287             $doc = $scroll->next;
288             @docs = $scroll->next($num);
289              
290             The C<next()> method returns the next result, or the next C<$num> results
291             (pulling more results if required). If all results have been exhausted,
292             it returns an empty list.
293              
294             =head2 C<drain_buffer()>
295              
296             @docs = $scroll->drain_buffer;
297              
298             The C<drain_buffer()> method returns all of the documents currently in the
299             buffer, without fetching any more from the cluster.
300              
301             =head2 C<refill_buffer()>
302              
303             $total = $scroll->refill_buffer;
304              
305             The C<refill_buffer()> method fetches the next batch of results from the
306             cluster, stores them in the buffer, and returns the total number of docs
307             currently in the buffer.
308              
309             =head2 C<buffer_size()>
310              
311             $total = $scroll->buffer_size;
312              
313             The C<buffer_size()> method returns the total number of docs currently in
314             the buffer.
315              
316             =head2 C<finish()>
317              
318             $scroll->finish;
319              
320             The C<finish()> method clears out the buffer, sets L</is_finished()> to C<true>
321             and tries to clear the C<scroll_id> on Elasticsearch. This API is only
322             supported since v0.90.5, but the call to C<clear_scroll> is wrapped in an
323             C<eval> so the C<finish()> method can be safely called with any version
324             of Elasticsearch.
325              
326             When the C<$scroll> instance goes out of scope, L</finish()> is called
327             automatically if required.
328              
329             =head2 C<is_finished()>
330              
331             $bool = $scroll->is_finished;
332              
333             A flag which returns C<true> if all results have been processed or
334             L</finish()> has been called.
335              
336             =head1 INFO ACCESSORS
337              
338             The information from the original search is returned via the following
339             accessors:
340              
341             =head2 C<total>
342              
343             The total number of documents that matched your query.
344              
345             =head2 C<max_score>
346              
347             The maximum score of any documents in your query.
348              
349             =head2 C<aggregations>
350              
351             Any aggregations that were specified, or C<undef>
352              
353             =head2 C<facets>
354              
355             Any facets that were specified, or C<undef>
356              
357             =head2 C<suggest>
358              
359             Any suggestions that were specified, or C<undef>
360              
361             =head2 C<took>
362              
363             How long the original search took, in milliseconds
364              
365             =head2 C<took_total>
366              
367             How long the original search plus all subsequent batches took, in milliseconds.
368              
369             =head1 SEE ALSO
370              
371             =over
372              
373             =item * L<Elasticsearch::Bulk/reindex()>
374              
375             =item * L<Elasticsearch::Client::Direct/search()>
376              
377             =item * L<Elasticsearch::Client::Direct/scroll()>
378              
379             =back
380              
381             =head1 AUTHOR
382              
383             Clinton Gormley <drtech@cpan.org>
384              
385             =head1 COPYRIGHT AND LICENSE
386              
387             This software is Copyright (c) 2014 by Elasticsearch BV.
388              
389             This is free software, licensed under:
390              
391             The Apache License, Version 2.0, January 2004
392              
393             =cut
394              
395             __END__
396              
397             # ABSTRACT: A helper module for scrolled searches
398