File Coverage

blib/lib/Elastic/Model/Index.pm
Criterion Covered Total %
statement 12 140 8.5
branch 0 52 0.0
condition 0 35 0.0
subroutine 4 19 21.0
pod 4 4 100.0
total 20 250 8.0


line stmt bran cond sub pod time code
1             package Elastic::Model::Index;
2             $Elastic::Model::Index::VERSION = '0.52';
3 23     23   176 use Carp;
  23         49  
  23         1827  
4 23     23   185 use Moose;
  23         54  
  23         173  
5             with 'Elastic::Model::Role::Index';
6              
7 23     23   148445 use namespace::autoclean;
  23         64  
  23         328  
8              
9 23     23   1937 no Moose;
  23         54  
  23         204  
10              
11             #===================================
12             sub create {
13             #===================================
14 0     0 1       my $self = shift;
15 0               my $params = $self->index_config(@_);
16 0               $self->model->store->create_index(%$params);
17 0               return $self;
18             }
19              
20             #===================================
21             sub reindex {
22             #===================================
23 0     0 1       my $self = shift;
24 0 0             my $domain = shift
25                     or croak "No (domain) passed to reindex()";
26              
27 0               my %args = ( repoint_uids => 1, @_ );
28 0               my $verbose = !$args{quiet};
29 0   0           my $scan = $args{scan} || '2m';
30 0   0           my $size = $args{size} || 1000;
31 0   0           my $bulk_size = $args{bulk_size} || $size;
32 0               my $dest_index = $self->name;
33 0               my $model = $self->model;
34 0   0 0         my $transform = $args{transform} || sub {@_};
  0            
35              
36 0 0             printf "Reindexing domain ($domain) to index ($dest_index)\n" if $verbose;
37              
38 0 0             if ( $self->exists ) {
39 0 0                 print "Index ($dest_index) already exists.\n" if $verbose;
40                 }
41                 else {
42 0 0                 print "Creating index ($dest_index)\n" if $verbose;
43 0                   $self->create();
44                 }
45              
46             # store all changed UIDs so that we can repoint them
47             # later, when they're used in docs that aren't being reindexed
48 0               my %uids;
49                 my $doc_updater = sub {
50 0     0             my ($doc) = $transform->(@_);
51 0                   $uids{ $doc->{_index} }{ $doc->{_type} }{ $doc->{_id} } = 1;
52 0                   $doc->{_index} = $dest_index;
53 0                   return $doc;
54 0               };
55              
56             # Map all indices that 'domain' points to, to $index->name
57 0               my $old = $model->domain($domain)->namespace->alias($domain);
58                 my %map
59 0                   = map { $_ => 1 } $old->is_alias
60 0 0                 ? keys %{ $old->aliased_to }
  0            
61                     : ($domain);
62              
63                 my $uid_updater = sub {
64 0     0             my $uid = shift;
65                     $uid->{index} = $dest_index
66 0 0                     if $map{ $uid->{index} };
67 0               };
68              
69 0               my $updater = $self->doc_updater( $doc_updater, $uid_updater );
70              
71 0               my $source = $model->view->domain($domain)->size($size)->scan($scan);
72                 $model->store->reindex(
73 0     0             source => sub { $source->shift_element },
74                     verbose => $verbose,
75                     transform => $updater,
76                     bulk_size => $bulk_size,
77                     on_conflict => $args{on_conflict},
78                     on_error => $args{on_error},
79 0               );
80              
81 0 0             return 1 unless $args{repoint_uids};
82              
83                 $self->repoint_uids(
84                     uids => \%uids,
85                     verbose => $verbose,
86                     exclude => [ keys %map ],
87                     size => $size,
88                     bulk_size => $bulk_size,
89                     scan => $scan,
90                     on_conflict => $args{uid_on_conflict},
91                     on_error => $args{uid_on_error},
92 0               );
93             }
94              
95             #===================================
96             sub repoint_uids {
97             #===================================
98 0     0 1       my ( $self, %args ) = @_;
99              
100 0               my $verbose = $args{verbose};
101 0   0           my $scan = $args{scan} || '2m';
102 0   0           my $size = $args{size} || 1000;
103 0   0           my $bulk_size = $args{bulk_size} || $size;
104 0               my $model = $self->model;
105 0               my $index_name = $self->name;
106 0   0           my $uids = $args{uids} || {};
107              
108 0 0             unless (%$uids) {
109 0 0                 print "No UIDs to repoint\n" if $verbose;
110 0                   return 1;
111                 }
112              
113 0 0             my %exclude = map { $_ => 1 } ( $index_name, @{ $args{exclude} || [] } );
  0            
  0            
114 0               my @indices = grep { not $exclude{$_} } $model->all_live_indices;
  0            
115              
116 0 0             unless (@indices) {
117 0 0                 print "No UIDs to repoint\n" if $verbose;
118 0                   return 1;
119                 }
120              
121 0               my @uid_attrs = $self->_uid_attrs_for_indices(@indices);
122 0 0             unless (@uid_attrs) {
123 0 0                 print "No UIDs to repoint\n" if $verbose;
124 0                   return 1;
125                 }
126              
127 0               my $view = $model->view->domain( \@indices )->size($size);
128              
129                 my $doc_updater = sub {
130 0     0             my $doc = shift;
131 0                   $doc->{_version}++;
132 0                   return $doc;
133 0               };
134              
135 0               my %map;
136                 my $uid_updater = sub {
137 0     0             my $uid = shift;
138 0 0                 return unless $uids->{ $uid->{index} }{ $uid->{type} }{ $uid->{id} };
139 0                   $uid->{index} = $index_name;
140 0               };
141              
142 0               my $updater = $self->doc_updater( $doc_updater, $uid_updater );
143              
144 0               for my $index ( keys %$uids ) {
145 0                   my $types = $uids->{$index};
146 0                   for my $type ( keys %$types ) {
147 0                       my @ids = keys %{ $types->{$type} };
  0            
148              
149 0 0                     printf( "Repointing %d UIDs from %s/%s\n",
150                             0 + @ids, $index, $type )
151                             if $verbose;
152              
153 0                       while (@ids) {
154              
155 0                           my $clauses
156                                 = $self->_build_uid_clauses( \@uid_attrs, $index, $type,
157                                 [ splice @ids, 0, $size ] );
158              
159 0                           my $source = $view->filter( or => $clauses )->scan($scan);
160                             $model->store->reindex(
161                                 source => sub {
162 0     0                             $source->shift_element;
163                                 },
164                                 bulk_size => $bulk_size,
165                                 quiet => 1,
166                                 transform => $updater,
167                                 on_conflict => $args{on_conflict},
168                                 on_error => $args{on_error},
169 0                           );
170                         }
171 0 0                     print "\n" if $verbose;
172                     }
173                 }
174              
175 0 0             print "\nDone\n" if $verbose;
176 0               return 1;
177             }
178              
179             #===================================
180             sub _uid_attrs_for_indices {
181             #===================================
182 0     0         my $self = shift;
183 0               my @indices = @_;
184 0               my $mapping = $self->model->store->get_mapping( index => \@indices );
185 0               my %attrs = map { $_ => 1 }
186 0                   map { _find_uid_attrs( $_->{properties} ) }
187 0                   map { values %{ $_->{mappings} } } values %$mapping;
  0            
  0            
188 0               return keys %attrs;
189              
190             }
191              
192             #===================================
193             sub _find_uid_attrs {
194             #===================================
195 0     0         my ( $mapping, $level ) = @_;
196              
197 0               my @attrs;
198 0 0             $level = '' unless $level;
199              
200 0               keys %$mapping;
201 0               while ( my ( $k, $v ) = each %$mapping ) {
202 0 0                 next unless $v->{properties};
203 0 0                 my $attr = $level ? "$level.$k" : $k;
204              
205 0 0 0               if ( $k eq 'uid' and $v->{properties} and $v->{properties}{index} ) {
      0        
206 0                       push @attrs, $attr;
207 0                       next;
208                     }
209 0   0               push @attrs, _find_uid_attrs( $v->{properties} || {}, $attr );
210                 }
211 0               return @attrs;
212             }
213              
214             #===================================
215             sub _build_uid_clauses {
216             #===================================
217 0     0         my ( $self, $uid_attrs, $index, $type, $ids ) = @_;
218 0               my @clauses;
219 0               for my $id (@$ids) {
220                     push @clauses, map {
221 0                       +{ and => [
  0            
222                                 { term => { "$_.index" => $index } },
223                                 { term => { "$_.type" => $type } },
224                                 { term => { "$_.id" => $id } }
225                             ]
226                             }
227                     } @$uid_attrs;
228                 }
229 0               return \@clauses;
230             }
231              
232             #===================================
233             sub doc_updater {
234             #===================================
235 0     0 1       my ( $self, $doc_updater, $uid_updater ) = @_;
236                 return sub {
237 0     0             my $doc = $doc_updater->(@_);
238 0                   my @stack = values %{ $doc->{_source} };
  0            
239              
240 0                   while ( my $val = shift @stack ) {
241 0 0                     unless ( ref $val eq 'HASH' ) {
242 0 0                         push @stack, @$val if ref $val eq 'ARRAY';
243 0                           next;
244                         }
245 0                       my $uid = $val->{uid};
246 0 0 0                   if ( $uid
      0        
      0        
247                             and ref $uid eq 'HASH'
248                             and $uid->{index}
249                             and $uid->{type} )
250                         {
251 0                           $uid_updater->($uid);
252                         }
253                         else {
254 0                           push @stack, values %$val;
255                         }
256                     }
257 0                   return $doc;
258 0               };
259             }
260              
261             __PACKAGE__->meta->make_immutable;
262              
263             1;
264              
265             =pod
266            
267             =encoding UTF-8
268            
269             =head1 NAME
270            
271             Elastic::Model::Index - Create and administer indices in Elasticsearch
272            
273             =head1 VERSION
274            
275             version 0.52
276            
277             =head1 SYNOPSIS
278            
279             $index = $model->namespace('myapp')->index;
280             $index = $model->namespace('myapp')->index('index_name');
281            
282             $index->create( settings => \%settings );
283            
284             $index->reindex( 'old_index' );
285            
286             See also L<Elastic::Model::Role::Index/SYNOPSIS>.
287            
288             =head1 DESCRIPTION
289            
290             L<Elastic::Model::Index> objects are used to create and administer indices
291             in an Elasticsearch cluster.
292            
293             See L<Elastic::Model::Role::Index> for more about usage.
294             See L<Elastic::Manual::Scaling> for more about how indices can be used in your
295             application.
296            
297             =head1 METHODS
298            
299             =head2 create()
300            
301             $index = $index->create();
302             $index = $index->create( settings => \%settings, types => \@types );
303            
304             Creates an index called L<name|Elastic::Role::Model::Index/name> (which
305             defaults to C<< $namespace->name >>).
306            
307             The L<type mapping|Elastic::Manual::Terminology/Mapping> is automatically
308             generated from the attributes of your doc classes listed in the
309             L<namespace|Elastic::Model::Namespace>. Similarly, any
310             L<custom analyzers|Elastic::Model/"Custom analyzers"> required
311             by your classes are added to the index
312             L<\%settings|http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-update-settings.html>
313             that you pass in:
314            
315             $index->create( settings => {number_of_shards => 1} );
316            
317             To create an index with a sub-set of the types known to the
318             L<namespace|Elastic::Model::Namespace>, pass in a list of C<@types>.
319            
320             $index->create( types => ['user','post' ]);
321            
322             =head2 reindex()
323            
324             # reindex $domain_name to $index->name
325             $index->reindex( $domain_name );
326            
327             # more options
328             $index->reindex(
329             $domain,
330            
331             repoint_uids => 1,
332             size => 1000,
333             bulk_size => 1000,
334             scan => '2m',
335             quiet => 0,
336            
337             transform => sub {...},
338            
339             on_conflict => sub {...} | 'IGNORE'
340             on_error => sub {...} | 'IGNORE'
341             uid_on_conflict => sub {...} | 'IGNORE'
342             uid_on_error => sub {...} | 'IGNORE'
343             );
344            
345             While you can add to the L<mapping|Elastic::Manual::Terminology/Mapping> of
346             an index, you can't change what is already there. Especially during development,
347             you will need to reindex your data to a new index.
348            
349             L</reindex()> reindexes your data from L<domain|Elastic::Manual::Terminology/Domain>
350             C<$domain_name> into an index called C<< $index->name >>. The new index is
351             created if it doesn't already exist.
352            
353             See L<Elastic::Manual::Reindex> for more about reindexing strategies. The
354             documentation below explains what each parameter does:
355            
356             =over
357            
358             =item size
359            
360             The C<size> parameter defaults to 1,000 and controls how many documents
361             are pulled from C<$domain> in each request. See L<Elastic::Model::View/size>.
362            
363             B<Note:> documents are pulled from the C<domain>/C<view> using
364             L<Elastic::Model::View/scan()>, which can pull a maximum of
365             L<size|Elastic::Model::View/size> C<* number_of_primary_shards> in a single
366             request. If you have large docs or underpowered servers, you may want to
367             change the C<size> parameter.
368            
369             =item bulk_size
370            
371             The C<bulk_size> parameter defaults to C<size> and controls how many documents
372             are indexed into the new domain in a single bulk-indexing request.
373            
374             =item scan
375            
376             C<scan> is the same as L<Elastic::Model::View/scan> - it controls how long
377             Elasticsearch should keep the "scroll" live between requests. Defaults to
378             '2m'. Increase this if the reindexing process is slow and you get
379             scroll timeouts.
380            
381             =item repoint_uids
382            
383             If true (the default), L</repoint_uids()> will be called automatically to
384             update any L<UIDs|Elastic::Model::UID> (which point at the old index) in
385             indices other than the ones currently being reindexed.
386            
387             =item transform
388            
389             If you need to change the structure/data of your doc while reindexing, you
390             can pass a C<transform> coderef. This will be called before any changes
391             have been made to the doc, and should return the new doc. For instance,
392             to convert the single-value C<tag> field to an array of C<tags>:
393            
394             $index->reindex(
395             'new_index',
396             'transform' => sub {
397             my $doc = shift;
398             $doc->{_source}{tags} = [ delete $doc->{_source}{tag} ];
399             return $doc
400             }
401             );
402            
403             =item on_conflict / on_error
404            
405             If you are indexing to the new index at the same time as you
406             are reindexing, you may get document conflicts. You can handle the conflicts
407             with a coderef callback, or ignore them by by setting C<on_conflict> to
408             C<'IGNORE'>:
409            
410             $index->reindex( 'myapp_v2', on_conflict => 'IGNORE' );
411            
412             Similarly, you can pass an C<on_error> handler which will handle other errors,
413             or all errors if no C<on_conflict> handler is defined.
414            
415             See L<Search::Elasticsearch::Bulk/Using-callbacks> for more.
416            
417             =item uid_on_conflict / uid_on_error
418            
419             These work in the same way as the C<on_conflict> or C<on_error> handlers,
420             but are passed to L</repoint_uids()> if C<repoint_uids> is true.
421            
422             =item quiet
423            
424             By default, L</reindex()> prints out progress information. To silence this,
425             set C<quiet> to true:
426            
427             $index->reindex( 'myapp_v2', quiet => 1 );
428            
429             =back
430            
431             =head2 repoint_uids()
432            
433             $index->repoint_uids(
434             uids => [ ['myapp_v1','user',10],['myapp_v1','user',12]...],
435             exclude => ['myapp_v2'],
436             scan => '2m',
437             size => 1000,
438             bulk_size => 1000,
439             quiet => 0,
440            
441             on_conflict => sub {...} | 'IGNORE'
442             on_error => sub {...} | 'IGNORE'
443             );
444            
445             The purpose of L</repoint_uids()> is to update stale L<UID|Elastic::Model::UID>
446             attributes to point to a new index. It is called automatically from
447             L</reindex()>.
448            
449             Parameters:
450            
451             =over
452            
453             =item uids
454            
455             C<uids> is a hash ref the stale L<UIDs|Elastic::Model::UID> which should be
456             updated.
457            
458             For instance: you have reindexed C<myapp_v1> to C<myapp_v2>, but domain
459             C<other> has documents with UIDs which point to C<myapp_v1>. You
460             can updated these by passing a list of the old UIDs, as follows:
461            
462             $index = $namespace->index('myapp_v2');
463             $index->repoint_uids(
464             uids => { # index
465             myapp_v1 => { # type
466             user => {
467             1 => 1, # ids
468             2 => 1,
469             }
470             }
471             }
472             );
473            
474             =item exclude
475            
476             By default, all indices known to the L<model|Elastic::Model::Role::Model> are
477             updated. You can exclude indices with:
478            
479             $index->repoint_uids(
480             uids => \@uids,
481             exclude => ['index_1', ...]
482             );
483            
484             =item size
485            
486             This is the same as the C<size> parameter to L</reindex()>.
487            
488             =item bulk_size
489            
490             This is the same as the C<bulk_size> parameter to L</reindex()>.
491            
492             =item scan
493            
494             This is the same as the C<scan> parameter to L</reindex()>.
495            
496             =item quiet
497            
498             This is the same as the C<quiet> parameter to L</reindex()>.
499            
500             =item on_conflict / on_error
501            
502             These are the same as the C<uid_on_conflict> and C<uid_on_error> handlers
503             in L</reindex()>.
504            
505             =back
506            
507             =head2 doc_updater()
508            
509             $coderef = $index->doc_updater( $doc_updater, $uid_updater );
510            
511             L</doc_updater()> is used by L</reindex()> and L</repoint_uids()> to update
512             the top-level doc and any UID attributes with callbacks.
513            
514             The C<$doc_updater> receives the C<$doc> as its only attribute, and should
515             return the C<$doc> after making any changes:
516            
517             $doc_updater = sub {
518             my ($doc) = @_;
519             $doc->{_index} = 'foo';
520             return $doc
521             };
522            
523             The C<$uid_updater> receives the UID as its only attribute:
524            
525             $uid_updater = sub {
526             my ($uid) = @_;
527             $uid->{index} = 'foo'
528             };
529            
530             =head1 IMPORTED ATTRIBUTES
531            
532             Attributes imported from L<Elastic::Model::Role::Index>
533            
534             =head2 L<namespace|Elastic::Model::Role::Index/namespace>
535            
536             =head2 L<name|Elastic::Model::Role::Index/name>
537            
538             =head1 IMPORTED METHODS
539            
540             Methods imported from L<Elastic::Model::Role::Index>
541            
542             =head2 L<close()|Elastic::Model::Role::Index/close()>
543            
544             =head2 L<open()|Elastic::Model::Role::Index/open()>
545            
546             =head2 L<refresh()|Elastic::Model::Role::Index/refresh()>
547            
548             =head2 L<delete()|Elastic::Model::Role::Index/delete()>
549            
550             =head2 L<update_analyzers()|Elastic::Model::Role::Index/update_analyzers()>
551            
552             =head2 L<update_settings()|Elastic::Model::Role::Index/update_settings()>
553            
554             =head2 L<delete_mapping()|Elastic::Model::Role::Index/delete_mapping()>
555            
556             =head2 L<is_alias()|Elastic::Model::Role::Index/is_alias()>
557            
558             =head2 L<is_index()|Elastic::Model::Role::Index/is_index()>
559            
560             =head1 SEE ALSO
561            
562             =over
563            
564             =item *
565            
566             L<Elastic::Model::Role::Index>
567            
568             =item *
569            
570             L<Elastic::Model::Alias>
571            
572             =item *
573            
574             L<Elastic::Model::Namespace>
575            
576             =item *
577            
578             L<Elastic::Manual::Scaling>
579            
580             =back
581            
582             =head1 AUTHOR
583            
584             Clinton Gormley <drtech@cpan.org>
585            
586             =head1 COPYRIGHT AND LICENSE
587            
588             This software is copyright (c) 2015 by Clinton Gormley.
589            
590             This is free software; you can redistribute it and/or modify it under
591             the same terms as the Perl 5 programming language system itself.
592            
593             =cut
594              
595             __END__
596            
597             # ABSTRACT: Create and administer indices in Elasticsearch
598            
599