File Coverage

blib/lib/Elasticsearch/Bulk.pm
Criterion Covered Total %
statement 12 57 21.0
branch 0 16 0.0
condition 0 15 0.0
subroutine 4 10 40.0
pod 3 3 100.0
total 19 101 18.8


line stmt bran cond sub pod time code
1             package Elasticsearch::Bulk;
2             $Elasticsearch::Bulk::VERSION = '1.05';
3 4     4   235194 use Moo;
  4         82944  
  4         27  
4             with 'Elasticsearch::Role::Bulk', 'Elasticsearch::Role::Is_Sync';
5 4     4   9312 use Elasticsearch::Util qw(parse_params throw);
  4         14  
  4         34  
6 4     4   4589 use Try::Tiny;
  4         4130  
  4         250  
7 4     4   3229 use namespace::clean;
  4         41412  
  4         32  
8              
9             #===================================
10             sub add_action {
11             #===================================
12 0     0 1   my $self = shift;
13 0           my $buffer = $self->_buffer;
14 0           my $max_size = $self->max_size;
15 0           my $max_count = $self->max_count;
16              
17 0           while (@_) {
18 0           my @json = $self->_encode_action( splice( @_, 0, 2 ) );
19              
20 0           push @$buffer, @json;
21              
22 0           my $size = $self->_buffer_size;
23 0           $size += length($_) + 1 for @json;
24 0           $self->_buffer_size($size);
25              
26 0           my $count = $self->_buffer_count( $self->_buffer_count + 1 );
27              
28 0 0 0       $self->flush
      0        
      0        
29             if ( $max_size && $size >= $max_size )
30             || $max_count && $count >= $max_count;
31             }
32 0           return 1;
33             }
34              
35             #===================================
36             sub flush {
37             #===================================
38 0     0 1   my $self = shift;
39              
40 0 0         return { items => [] }
41             unless $self->_buffer_size;
42              
43 0 0         if ( $self->verbose ) {
44 0           local $| = 1;
45 0           print ".";
46             }
47 0           my $buffer = $self->_buffer;
48             my $results = try {
49 0     0     $self->es->bulk( %{ $self->_bulk_args }, body => $buffer );
  0            
50             }
51             catch {
52 0     0     my $error = $_;
53 0 0 0       $self->clear_buffer
54             if $error->is('Request')
55             and not $error->is('Unavailable');
56              
57 0           die $error;
58 0           };
59 0           $self->clear_buffer;
60 0           $self->_report( $buffer, $results );
61 0 0         return defined wantarray ? $results : undef;
62             }
63              
64             #===================================
65             sub reindex {
66             #===================================
67 0     0 1   my ( $self, $params ) = parse_params(@_);
68 0 0         my $src = $params->{source}
69             or throw( 'Param', "Missing required param <source>" );
70 0           $src = {%$src};
71              
72 0           my $transform = $self->_doc_transformer($params);
73              
74 0 0         if ( ref $src eq 'HASH' ) {
75 0   0       my $es = delete $src->{es} || $self->es;
76 0           my $scroll = $es->scroll_helper(
77             search_type => 'scan',
78             size => 500,
79             %$src
80             );
81              
82             $src = sub {
83 0     0     $scroll->refill_buffer;
84 0           $scroll->drain_buffer;
85 0           };
86              
87 0 0         print "Reindexing " . $scroll->total . " docs\n"
88             if $self->verbose;
89             }
90              
91 0           while ( my @docs = $src->() ) {
92 0           $self->index( grep {$_} map { $transform->($_) } @docs );
  0            
  0            
93             }
94 0           $self->flush;
95 0           return 1;
96             }
97              
98             1;
99              
100             =pod
101              
102             =encoding UTF-8
103              
104             =head1 NAME
105              
106             Elasticsearch::Bulk - A helper module for the Bulk API and for reindexing
107              
108             =head1 VERSION
109              
110             version 1.05
111              
112             =head1 SYNOPSIS
113              
114             use Elasticsearch;
115             use Elasticsearch::Bulk;
116              
117             my $es = Elasticsearch->new;
118             my $bulk = Elasticsearch::Bulk->new(
119             es => $es,
120             index => 'my_index',
121             type => 'my_type'
122             );
123              
124             # Index docs:
125             $bulk->index({ id => 1, source => { foo => 'bar' }});
126             $bulk->add_action( index => { id => 1, source => { foo=> 'bar' }});
127              
128             # Create docs:
129             $bulk->create({ id => 1, source => { foo => 'bar' }});
130             $bulk->add_action( create => { id => 1, source => { foo=> 'bar' }});
131             $bulk->create_docs({ foo => 'bar' })
132              
133             # Delete docs:
134             $bulk->delete({ id => 1});
135             $bulk->add_action( delete => { id => 1 });
136             $bulk->delete_ids(1,2,3)
137              
138             # Update docs:
139             $bulk->update({ id => 1, script => '...' });
140             $bulk->add_action( update => { id => 1, script => '...' });
141              
142             # Manual flush
143             $bulk->flush
144              
145             # Reindex docs:
146             $bulk = Elasticsearch::Bulk->new(
147             es => $es,
148             index => 'new_index',
149             verbose => 1
150             );
151              
152             $bulk->reindex( source => { index => 'old_index' });
153              
154             =head1 DESCRIPTION
155              
156             This module provides a wrapper for the L<Elasticsearch::Client::Direct/bulk()>
157             method which makes it easier to run multiple create, index, update or delete
158             actions in a single request. It also provides a simple interface
159             for L<reindexing documents|/REINDEXING DOCUMENTS>.
160              
161             The L<Elasticsearch::Bulk> module acts as a queue, buffering up actions
162             until it reaches a maximum count of actions, or a maximum size of JSON request
163             body, at which point it issues a C<bulk()> request.
164              
165             Once you have finished adding actions, call L</flush()> to force the final
166             C<bulk()> request on the items left in the queue.
167              
168             This class does L<Elasticsearch::Role::Bulk> and
169             L<Elasticsearch::Role::Is_Sync>.
170              
171             =head1 CREATING A NEW INSTANCE
172              
173             =head2 C<new()>
174              
175             $bulk = Elasticsearch::Bulk->new(
176             es => $es, # required
177              
178             index => 'default_index', # optional
179             type => 'default_type', # optional
180             %other_bulk_params # optional
181              
182             max_count => 1_000, # optional
183             max_size => 1_000_000, # optional
184              
185             verbose => 0 | 1, # optional
186              
187             on_success => sub {...}, # optional
188             on_error => sub {...}, # optional
189             on_conflict => sub {...}, # optional
190              
191              
192             );
193              
194             The C<new()> method returns a new C<$bulk> object. You must pass your
195             Elasticsearch client as the C<es> argument.
196              
197             The C<index> and C<type> parameters provide default values for
198             C<index> and C<type>, which can be overridden in each action.
199             You can also pass any other values which are accepted
200             by the L<bulk()|Elasticsearch::Client::Direct/bulk()> method.
201              
202             See L</flush()> for more information about the other parameters.
203              
204             =head1 FLUSHING THE BUFFER
205              
206             =head2 C<flush()>
207              
208             $result = $bulk->flush;
209              
210             The C<flush()> method sends all buffered actions to Elasticsearch using
211             a L<bulk()|Elasticsearch::Client::Direct/bulk()> request.
212              
213             =head2 Auto-flushing
214              
215             An automatic L</flush()> is triggered whenever the C<max_count> or C<max_size>
216             threshold is breached. This causes all actions in the buffer to be
217             sent to Elasticsearch.
218              
219             =over
220              
221             =item * C<max_count>
222              
223             The maximum number of actions to allow before triggering a L</flush()>.
224             This can be disabled by setting C<max_count> to C<0>. Defaults to
225             C<1,000>.
226              
227             =item * C<max_size>
228              
229             The maximum size of JSON request body to allow before triggering a
230             L</flush()>. This can be disabled by setting C<max_size> to C<0>. Defaults
231             to C<1_000,000> bytes.
232              
233             =back
234              
235             =head2 Errors when flushing
236              
237             There are three levels of error which can be thrown when L</flush()>
238             is called, either manually or automatically.
239              
240             =over
241              
242             =item * Temporary Elasticsearch errors
243              
244             For instance, a C<NoNodes> error which indicates that your cluster is down.
245             These errors do not clear the buffer, as they can be retried later on.
246              
247             =item * Request errors
248              
249             For instance, if one of your actions is malformed (eg you are missing
250             a required parameter like C<index>) then the whole L</flush()> request is
251             aborted and the buffer is cleared of all actions.
252              
253             =item * Action errors
254              
255             Individual actions may fail. For instance, a C<create> action will fail
256             if a document with the same C<index>, C<type> and C<id> already exists.
257             These action errors are reported via L<callbacks|/Using callbacks>.
258              
259             =back
260              
261             =head2 Using callbacks
262              
263             By default, any I<Action errors> (see above) cause warnings to be
264             written to C<STDERR>. However, you can use the C<on_error>, C<on_conflict>
265             and C<on_success> callbacks for more fine-grained control.
266              
267             All callbacks receive the following arguments:
268              
269             =over
270              
271             =item C<$action>
272              
273             The name of the action, ie C<index>, C<create>, C<update> or C<delete>.
274              
275             =item C<$response>
276              
277             The response that Elasticsearch returned for this action.
278              
279             =item C<$i>
280              
281             The index of the action, ie the first action in the flush request
282             will have C<$i> set to C<0>, the second will have C<$i> set to C<1> etc.
283              
284             =back
285              
286             =head3 C<on_success>
287              
288             $bulk = Elasticsearch->new(
289             es => $es,
290             on_success => sub {
291             my ($action,$response,$i) = @_;
292             # do something
293             },
294             );
295              
296             The C<on_success> callback is called for every action that has a successful
297             response.
298              
299             =head3 C<on_conflict>
300              
301             $bulk = Elasticsearch::Bulk->new(
302             es => $es,
303             on_conflict => sub {
304             my ($action,$response,$i,$version) = @_;
305             # do something
306             },
307             );
308              
309             The C<on_conflict> callback is called for actions that have triggered
310             a C<Conflict> error, eg trying to C<create> a document which already
311             exists. The C<$version> argument will contain the version number
312             of the document currently stored in Elasticsearch (if found).
313              
314             =head3 C<on_error>
315              
316             $bulk = Elasticsearch::Bulk->new(
317             es => $es,
318             on_error => sub {
319             my ($action,$response,$i) = @_;
320             # do something
321             },
322             );
323              
324             The C<on_error> callback is called for any error (unless the C<on_conflict>)
325             callback has already been called).
326              
327             =head2 Disabling callbacks and autoflush
328              
329             If you want to be in control of flushing, and you just want to receive
330             the raw response that Elasticsearch sends instead of using callbacks,
331             then you can do so as follows:
332              
333             $bulk = Elasticsearch::Bulk->new(
334             es => $es,
335             max_count => 0,
336             max_size => 0,
337             on_error => undef
338             );
339              
340             $bulk->add_actions(....);
341             $response = $bulk->flush;
342              
343             =head1 CREATE, INDEX, UPDATE, DELETE
344              
345             =head2 C<add_action()>
346              
347             $bulk->add_action(
348             create => { ...params... },
349             index => { ...params... },
350             update => { ...params... },
351             delete => { ...params... }
352             );
353              
354             The C<add_action()> method allows you to add multiple C<create>, C<index>,
355             C<update> and C<delete> actions to the queue. The first value is the action
356             type, and the second value is the parameters that describe that action.
357             See the individual helper methods below for details.
358              
359             B<Note:> Parameters like C<index> or C<type> can be specified as C<index> or as
360             C<_index>, so the following two lines are equivalent:
361              
362             index => { index => 'index', type => 'type', id => 1, source => {...}},
363             index => { _index => 'index', _type => 'type', _id => 1, _source => {...}},
364              
365             B<Note:> The C<index> and C<type> parameters can be specified in the
366             params for any action, but if not specified, will default to the C<index>
367             and C<type> values specified in L</new()>. These are required parameters:
368             they must be specified either in L</new()> or in every action.
369              
370             =head2 C<create()>
371              
372             $bulk->create(
373             { index => 'custom_index', source => { doc body }},
374             { type => 'custom_type', id => 1, source => { doc body }},
375             ...
376             );
377              
378             The C<create()> helper method allows you to add multiple C<create> actions.
379             It accepts the same parameters as L<Elasticsearch::Client::Direct/create()>
380             except that the document body should be passed as the C<source> or C<_source>
381             parameter, instead of as C<body>.
382              
383             =head2 C<create_docs()>
384              
385             $bulk->create_docs(
386             { doc body },
387             { doc body },
388             ...
389             );
390              
391             The C<create_docs()> helper is a shorter form of L</create()> which can be used
392             when you are using the default C<index> and C<type> as set in L</new()>
393             and you are not specifying a custom C<id> per document. In this case,
394             you can just pass the individual document bodies.
395              
396             =head2 C<index()>
397              
398             $bulk->index(
399             { index => 'custom_index', source => { doc body }},
400             { type => 'custom_type', id => 1, source => { doc body }},
401             ...
402             );
403              
404             The C<index()> helper method allows you to add multiple C<index> actions.
405             It accepts the same parameters as L<Elasticsearch::Client::Direct/index()>
406             except that the document body should be passed as the C<source> or C<_source>
407             parameter, instead of as C<body>.
408              
409             =head2 C<delete()>
410              
411             $bulk->delete(
412             { index => 'custom_index', id => 1},
413             { type => 'custom_type', id => 2},
414             ...
415             );
416              
417             The C<delete()> helper method allows you to add multiple C<delete> actions.
418             It accepts the same parameters as L<Elasticsearch::Client::Direct/delete()>.
419              
420             =head2 C<delete_ids()>
421              
422             $bulk->delete_ids(1,2,3...)
423              
424             The C<delete_ids()> helper method can be used when all of the documents you
425             want to delete have the default C<index> and C<type> as set in L</new()>.
426             In this case, all you have to do is to pass in a list of IDs.
427              
428             =head2 C<update()>
429              
430             $bulk->update(
431             { id => 1,
432             doc => { partial doc },
433             doc_as_upsert => 1
434             },
435             { id => 2,
436             lang => 'mvel',
437             script => '_ctx.source.counter+=incr',
438             params => { incr => 1},
439             upsert => { upsert doc }
440             },
441             ...
442             );
443              
444             The C<update()> helper method allows you to add multiple C<update> actions.
445             It accepts the same parameters as L<Elasticsearch::Client::Direct/update()>.
446             An update can either use a I<partial doc> which gets merged with an existing
447             doc (example 1 above), or can use a C<script> to update an existing doc
448             (example 2 above).
449              
450             =head1 REINDEXING DOCUMENTS
451              
452             A common use case for bulk indexing is to reindex a whole index when
453             changing the type mappings or analysis chain. This typically
454             combines bulk indexing with L<scrolled searches|Elasticsearch::Scroll>:
455             the scrolled search pulls all of the data from the source index, and
456             the bulk indexer indexes the data into the new index.
457              
458             =head2 C<reindex()>
459              
460             $bulk->reindex(
461             source => $source, # required
462             transform => \&transform, # optional
463             version_type => 'external|internal', # optional
464             );
465              
466             The C<reindex()> method requires a C<$source> parameter, which provides
467             the source for the documents which are to be reindexed.
468              
469             =head2 Reindexing from another index
470              
471             If the C<source> argument is a HASH ref, then the hash is passed to
472             L<Elasticsearch::Scroll/new()> to create a new scrolled search.
473              
474             $bulk = Elasticsearch::Bulk->new(
475             index => 'new_index',
476             verbose => 1
477             );
478              
479             $bulk->reindex(
480             source => {
481             index => 'old_index',
482             size => 500, # default
483             search_type => 'scan' # default
484             }
485             );
486              
487             If a default C<index> or C<type> has been specified in the call to
488             L</new()>, then it will replace the C<index> and C<type> values for
489             the docs returned from the scrolled search. In the example above,
490             all docs will be retrieved from C<"old_index"> and will be bulk indexed
491             into C<"new_index">.
492              
493             =head2 Reindexing from a generic source
494              
495             The C<source> parameter also accepts a coderef or an anonymous sub,
496             which should return one or more new documents every time it is executed.
497             This allows you to pass any iterator, wrapped in an anonymous sub:
498              
499             my $iter = get_iterator_from_somewhere();
500              
501             $bulk->reindex(
502             source => sub { $iter->next }
503             );
504              
505             =head2 Transforming docs on the fly
506              
507             The C<transform> parameter allows you to change documents on the fly,
508             using a callback. The callback receives the document as the only argument,
509             and should return the updated document, or C<undef> if the document should
510             not be indexed:
511              
512             $bulk->reindex(
513             source => { index => 'old_index' },
514             transform => sub {
515             my $doc = shift;
516              
517             # don't index doc marked as valid:false
518             return undef unless $doc->{_source}{valid};
519              
520             # convert $tag to @tags
521             $doc->{_source}{tags} = [ delete $doc->{_source}{tag}];
522             return $doc
523             }
524             );
525              
526             =head2 Reindexing from another cluster
527              
528             By default, L</reindex()> expects the source and destination indices
529             to be in the same cluster. To pull data from one cluster and index it into
530             another, you can use two separate C<$es> objects:
531              
532             $es_target = Elasticsearch->new( nodes => 'localhost:9200' );
533             $es_source = Elasticsearch->new( nodes => 'search1:9200' );
534              
535             Elasticsearch::Bulk->new(
536             es => $es_target,
537             verbose => 1
538             )
539             -> reindex(
540             source => {
541             es => $es_source,
542             index => 'my_index'
543             }
544             );
545              
546             =head2 Parents and routing
547              
548             If you are using parent-child relationships or custom C<routing> values,
549             and you want to preserve these when you reindex your documents, then
550             you will need to request these values specifically, as follows:
551              
552             $bulk->reindex(
553             source => {
554             index => 'old_index',
555             fields => ['_source','_parent','_routing']
556             }
557             );
558              
559             =head2 Working with version numbers
560              
561             Every document in Elasticsearch has a current C<version> number, which
562             is used for L<optimistic concurrency control|http://en.wikipedia.org/wiki/Optimistic_concurrency_control>,
563             that is, to ensure that you don't overwrite changes that have been made
564             by another process.
565              
566             All CRUD operations accept a C<version> parameter and a C<version_type>
567             parameter which tells Elasticsearch that the change should only be made
568             if the current document corresponds to these parameters. The
569             C<version_type> parameter can have the following values:
570              
571             =over
572              
573             =item * C<internal>
574              
575             Use Elasticsearch version numbers. Documents are only changed if the
576             document in Elasticsearch has the B<same> C<version> number that is
577             specified in the CRUD operation. After the change, the new
578             version number is C<version+1>.
579              
580             =item * C<external>
581              
582             Use an external versioning system, such as timestamps or version numbers
583             from an external database. Documents are only changed if the document
584             in Elasticsearch has a B<lower> C<version> number than the one
585             specified in the CRUD operation. After the change, the new version
586             number is C<version>.
587              
588             =back
589              
590             If you would like to reindex documents from one index to another, preserving
591             the C<version> numbers from the original index, then you need the following:
592              
593             $bulk->reindex(
594             source => {
595             index => 'old_index',
596             version => 1, # retrieve version numbers in search
597             },
598             version_type => 'external' # use these "external" version numbers
599             );
600              
601             =head1 AUTHOR
602              
603             Clinton Gormley <drtech@cpan.org>
604              
605             =head1 COPYRIGHT AND LICENSE
606              
607             This software is Copyright (c) 2014 by Elasticsearch BV.
608              
609             This is free software, licensed under:
610              
611             The Apache License, Version 2.0, January 2004
612              
613             =cut
614              
615             __END__
616              
617             # ABSTRACT: A helper module for the Bulk API and for reindexing
618