File Coverage

blib/lib/Search/Elasticsearch/Client/5_0/Bulk.pm
Criterion Covered Total %
statement 37 43 86.0
branch 4 10 40.0
condition 5 15 33.3
subroutine 8 8 100.0
pod 2 2 100.0
total 56 78 71.7


line stmt bran cond sub pod time code
1             package Search::Elasticsearch::Client::5_0::Bulk;
2             $Search::Elasticsearch::Client::5_0::Bulk::VERSION = '6.00';
3 1     1   953 use Moo;
  1         5  
  1         10  
4             with 'Search::Elasticsearch::Client::5_0::Role::Bulk',
5             'Search::Elasticsearch::Role::Is_Sync';
6 1     1   630 use Search::Elasticsearch::Util qw(parse_params throw);
  1         4  
  1         10  
7 1     1   755 use Try::Tiny;
  1         5  
  1         103  
8 1     1   10 use namespace::clean;
  1         10  
  1         13  
9              
10             #===================================
11             sub add_action {
12             #===================================
13 1     1 1 3 my $self = shift;
14 1         6 my $buffer = $self->_buffer;
15 1         4 my $max_size = $self->max_size;
16 1         4 my $max_count = $self->max_count;
17 1         4 my $max_time = $self->max_time;
18              
19 1         5 while (@_) {
20 1         7 my @json = $self->_encode_action( splice( @_, 0, 2 ) );
21              
22 1         122 push @$buffer, @json;
23              
24 1         5 my $size = $self->_buffer_size;
25 1         4 $size += length($_) + 1 for @json;
26 1         4 $self->_buffer_size($size);
27              
28 1         4 my $count = $self->_buffer_count( $self->_buffer_count + 1 );
29              
30 1 50 33     16 $self->flush
      33        
      33        
      33        
      33        
31             if ( $max_size and $size >= $max_size )
32             || ( $max_count and $count >= $max_count )
33             || ( $max_time and time >= $self->_last_flush + $max_time );
34             }
35 1         2 return 1;
36             }
37              
38             #===================================
39             sub flush {
40             #===================================
41 1     1 1 1144 my $self = shift;
42 1         8 $self->_last_flush(time);
43              
44 1 50       17 return { items => [] }
45             unless $self->_buffer_size;
46              
47 1 50       9 if ( $self->verbose ) {
48 0         0 local $| = 1;
49 0         0 print ".";
50             }
51 1         5 my $buffer = $self->_buffer;
52             my $results = try {
53 1     1   167 my $res = $self->es->bulk( %{ $self->_bulk_args }, body => $buffer );
  1         15  
54 0         0 $self->clear_buffer;
55 0         0 return $res;
56             }
57             catch {
58 1     1   2009469 my $error = $_;
59 1 50       5 $self->clear_buffer
60             unless $error->is( 'Cxn', 'NoNodes' );
61              
62 1         29 die $error;
63 1         15 };
64 0           $self->_report( $buffer, $results );
65 0 0         return defined wantarray ? $results : undef;
66             }
67              
68             1;
69              
70             =pod
71              
72             =encoding UTF-8
73              
74             =head1 NAME
75              
76             Search::Elasticsearch::Client::5_0::Bulk - A helper module for the Bulk API
77              
78             =head1 VERSION
79              
80             version 6.00
81              
82             =head1 SYNOPSIS
83              
84             use Search::Elasticsearch;
85              
86             my $es = Search::Elasticsearch->new;
87             my $bulk = $es->bulk_helper(
88             index => 'my_index',
89             type => 'my_type'
90             );
91              
92             # Index docs:
93             $bulk->index({ id => 1, source => { foo => 'bar' }});
94             $bulk->add_action( index => { id => 1, source => { foo=> 'bar' }});
95              
96             # Create docs:
97             $bulk->create({ id => 1, source => { foo => 'bar' }});
98             $bulk->add_action( create => { id => 1, source => { foo=> 'bar' }});
99             $bulk->create_docs({ foo => 'bar' })
100              
101             # Delete docs:
102             $bulk->delete({ id => 1});
103             $bulk->add_action( delete => { id => 1 });
104             $bulk->delete_ids(1,2,3)
105              
106             # Update docs:
107             $bulk->update({ id => 1, script => '...' });
108             $bulk->add_action( update => { id => 1, script => '...' });
109              
110             # Manual flush
111             $bulk->flush;
112              
113             =head1 DESCRIPTION
114              
115             This module provides a wrapper for the L
116             method which makes it easier to run multiple create, index, update or delete
117             actions in a single request.
118              
119             The L module acts as a queue, buffering up actions
120             until it reaches a maximum count of actions, or a maximum size of JSON request
121             body, at which point it issues a C request.
122              
123             Once you have finished adding actions, call L to force the final
124             C request on the items left in the queue.
125              
126             This class does L and
127             L.
128              
129             =head1 CREATING A NEW INSTANCE
130              
131             =head2 C
132              
133             my $bulk = $es->bulk_helper(
134              
135             index => 'default_index', # optional
136             type => 'default_type', # optional
137             %other_bulk_params # optional
138              
139             max_count => 1_000, # optional
140             max_size => 1_000_000, # optional
141             max_time => 5, # optional
142              
143             verbose => 0 | 1, # optional
144              
145             on_success => sub {...}, # optional
146             on_error => sub {...}, # optional
147             on_conflict => sub {...}, # optional
148              
149              
150             );
151              
152             The C method returns a new C<$bulk> object. You must pass your
153             Search::Elasticsearch client as the C argument.
154              
155             The C and C parameters provide default values for
156             C and C, which can be overridden in each action.
157             You can also pass any other values which are accepted
158             by the L method.
159              
160             See L for more information about the other parameters.
161              
162             =head1 FLUSHING THE BUFFER
163              
164             =head2 C
165              
166             $result = $bulk->flush;
167              
168             The C method sends all buffered actions to Elasticsearch using
169             a L request.
170              
171             =head2 Auto-flushing
172              
173             An automatic L is triggered whenever the C, C,
174             or C threshold is breached. This causes all actions in the buffer to be
175             sent to Elasticsearch.
176              
177             =over
178              
179             =item * C
180              
181             The maximum number of actions to allow before triggering a L.
182             This can be disabled by setting C to C<0>. Defaults to
183             C<1,000>.
184              
185             =item * C
186              
187             The maximum size of JSON request body to allow before triggering a
188             L. This can be disabled by setting C to C<0>. Defaults
189             to C<1_000,000> bytes.
190              
191             =item * C
192              
193             The maximum number of seconds to wait before triggering a flush. Defaults
194             to C<0> seconds, which means that it is disabled. B This timeout
195             is only triggered when new items are added to the queue, not in the background.
196              
197             =back
198              
199             =head2 Errors when flushing
200              
201             There are two types of error which can be thrown when L
202             is called, either manually or automatically.
203              
204             =over
205              
206             =item * Temporary Elasticsearch errors
207              
208             A C error like a C error which indicates that your cluster is down.
209             These errors do not clear the buffer, as they can be retried later on.
210              
211             =item * Action errors
212              
213             Individual actions may fail. For instance, a C action will fail
214             if a document with the same C, C and C already exists.
215             These action errors are reported via L.
216              
217             =back
218              
219             =head2 Using callbacks
220              
221             By default, any I (see above) cause warnings to be
222             written to C. However, you can use the C, C
223             and C callbacks for more fine-grained control.
224              
225             All callbacks receive the following arguments:
226              
227             =over
228              
229             =item C<$action>
230              
231             The name of the action, ie C, C, C or C.
232              
233             =item C<$response>
234              
235             The response that Elasticsearch returned for this action.
236              
237             =item C<$i>
238              
239             The index of the action, ie the first action in the flush request
240             will have C<$i> set to C<0>, the second will have C<$i> set to C<1> etc.
241              
242             =back
243              
244             =head3 C
245              
246             my $bulk = $es->bulk_helper(
247             on_success => sub {
248             my ($action,$response,$i) = @_;
249             # do something
250             },
251             );
252              
253             The C callback is called for every action that has a successful
254             response.
255              
256             =head3 C
257              
258             my $bulk = $es->bulk_helper(
259             on_conflict => sub {
260             my ($action,$response,$i,$version) = @_;
261             # do something
262             },
263             );
264              
265             The C callback is called for actions that have triggered
266             a C error, eg trying to C a document which already
267             exists. The C<$version> argument will contain the version number
268             of the document currently stored in Elasticsearch (if found).
269              
270             =head3 C
271              
272             my $bulk = $es->bulk_helper(
273             on_error => sub {
274             my ($action,$response,$i) = @_;
275             # do something
276             },
277             );
278              
279             The C callback is called for any error (unless the C)
280             callback has already been called).
281              
282             =head2 Disabling callbacks and autoflush
283              
284             If you want to be in control of flushing, and you just want to receive
285             the raw response that Elasticsearch sends instead of using callbacks,
286             then you can do so as follows:
287              
288             my $bulk = $es->bulk_helper(
289             max_count => 0,
290             max_size => 0,
291             on_error => undef
292             );
293              
294             $bulk->add_actions(....);
295             $response = $bulk->flush;
296              
297             =head1 CREATE, INDEX, UPDATE, DELETE
298              
299             =head2 C
300              
301             $bulk->add_action(
302             create => { ...params... },
303             index => { ...params... },
304             update => { ...params... },
305             delete => { ...params... }
306             );
307              
308             The C method allows you to add multiple C, C,
309             C and C actions to the queue. The first value is the action
310             type, and the second value is the parameters that describe that action.
311             See the individual helper methods below for details.
312              
313             B Parameters like C or C can be specified as C or as
314             C<_index>, so the following two lines are equivalent:
315              
316             index => { index => 'index', type => 'type', id => 1, source => {...}},
317             index => { _index => 'index', _type => 'type', _id => 1, source => {...}},
318              
319             B The C and C parameters can be specified in the
320             params for any action, but if not specified, will default to the C
321             and C values specified in L. These are required parameters:
322             they must be specified either in L or in every action.
323              
324             =head2 C
325              
326             $bulk->create(
327             { index => 'custom_index', source => { doc body }},
328             { type => 'custom_type', id => 1, source => { doc body }},
329             ...
330             );
331              
332             The C helper method allows you to add multiple C actions.
333             It accepts the same parameters as L
334             except that the document body should be passed as the C or C<_source>
335             parameter, instead of as C.
336              
337             =head2 C
338              
339             $bulk->create_docs(
340             { doc body },
341             { doc body },
342             ...
343             );
344              
345             The C helper is a shorter form of L which can be used
346             when you are using the default C and C as set in L
347             and you are not specifying a custom C per document. In this case,
348             you can just pass the individual document bodies.
349              
350             =head2 C
351              
352             $bulk->index(
353             { index => 'custom_index', source => { doc body }},
354             { type => 'custom_type', id => 1, source => { doc body }},
355             ...
356             );
357              
358             The C helper method allows you to add multiple C actions.
359             It accepts the same parameters as L
360             except that the document body should be passed as the C or C<_source>
361             parameter, instead of as C.
362              
363             =head2 C
364              
365             $bulk->delete(
366             { index => 'custom_index', id => 1},
367             { type => 'custom_type', id => 2},
368             ...
369             );
370              
371             The C helper method allows you to add multiple C actions.
372             It accepts the same parameters as L.
373              
374             =head2 C
375              
376             $bulk->delete_ids(1,2,3...)
377              
378             The C helper method can be used when all of the documents you
379             want to delete have the default C and C as set in L.
380             In this case, all you have to do is to pass in a list of IDs.
381              
382             =head2 C
383              
384             $bulk->update(
385             { id => 1,
386             doc => { partial doc },
387             doc_as_upsert => 1
388             },
389             { id => 2,
390             script => { script }
391             upsert => { upsert doc }
392             },
393             ...
394             );
395              
396             The C helper method allows you to add multiple C actions.
397             It accepts the same parameters as L.
398             An update can either use a I which gets merged with an existing
399             doc (example 1 above), or can use a C