File Coverage

blib/lib/Elasticsearch/Role/Bulk.pm
Criterion Covered Total %
statement 9 98 9.1
branch 0 46 0.0
condition 0 26 0.0
subroutine 3 18 16.6
pod 0 8 0.0
total 12 196 6.1


line stmt bran cond sub pod time code
1             package Elasticsearch::Role::Bulk;
2             $Elasticsearch::Role::Bulk::VERSION = '1.05';
3 4     4   40403 use Moo::Role;
  4         9  
  4         31  
4             requires 'add_action', 'flush';
5              
6 4     4   2153 use Elasticsearch::Util qw(parse_params throw);
  4         11  
  4         64  
7 4     4   1980 use namespace::clean;
  4         10  
  4         41  
8              
9             has 'es' => ( is => 'ro', required => 1 );
10             has 'max_count' => ( is => 'rw', default => 1_000 );
11             has 'max_size' => ( is => 'rw', default => 1_000_000 );
12             has 'on_success' => ( is => 'ro', default => 0 );
13             has 'on_error' => ( is => 'lazy' );
14             has 'on_conflict' => ( is => 'ro', default => 0 );
15             has 'verbose' => ( is => 'rw' );
16              
17             has '_buffer' => ( is => 'ro', default => sub { [] } );
18             has '_buffer_size' => ( is => 'rw', default => 0 );
19             has '_buffer_count' => ( is => 'rw', default => 0 );
20             has '_serializer' => ( is => 'lazy' );
21             has '_bulk_args' => ( is => 'ro' );
22              
23             our $Conflict = qr/
24             DocumentAlreadyExistsException
25             | :.version.conflict,.current.\[(\d+)\]
26             /x;
27              
28             our %Actions = (
29             'index' => 1,
30             'create' => 1,
31             'update' => 1,
32             'delete' => 1
33             );
34              
35             our @Metadata_Keys = (
36             'index', 'type', 'id', 'routing',
37             'parent', 'timestamp', 'ttl', 'version',
38             'version_type'
39             );
40              
41             #===================================
42 0     0     sub _build__serializer { shift->es->transport->serializer }
43             #===================================
44              
45             #===================================
46             sub _build_on_error {
47             #===================================
48 0     0     my $self = shift;
49 0           my $serializer = $self->_serializer;
50             return sub {
51 0     0     my ( $action, $result, $src ) = @_;
52 0           warn( "Bulk error [$action]: " . $serializer->encode($result) );
53 0           };
54             }
55              
56             #===================================
57             sub BUILDARGS {
58             #===================================
59 0     0 0   my ( $class, $params ) = parse_params(@_);
60 0           my %args;
61 0           for (qw(index type consistency refresh replication timeout)) {
62 0 0         $args{$_} = $params->{$_}
63             if exists $params->{$_};
64             }
65 0           $params->{_bulk_args} = \%args;
66 0           return $params;
67             }
68              
69             #===================================
70             sub index {
71             #===================================
72 0     0 0   shift->add_action( map { ( 'index' => $_ ) } @_ );
  0            
73             }
74              
75             #===================================
76             sub create {
77             #===================================
78 0     0 0   shift->add_action( map { ( 'create' => $_ ) } @_ );
  0            
79             }
80              
81             #===================================
82             sub delete {
83             #===================================
84 0     0 0   shift->add_action( map { ( 'delete' => $_ ) } @_ );
  0            
85             }
86              
87             #===================================
88             sub update {
89             #===================================
90 0     0 0   shift->add_action( map { ( 'update' => $_ ) } @_ );
  0            
91             }
92              
93             #===================================
94             sub create_docs {
95             #===================================
96 0     0 0   my $self = shift;
97 0           $self->add_action( map { ( 'create' => { _source => $_ } ) } @_ );
  0            
98             }
99              
100             #===================================
101             sub delete_ids {
102             #===================================
103 0     0 0   my $self = shift;
104 0           $self->add_action( map { ( 'delete' => { _id => $_ } ) } @_ );
  0            
105             }
106              
107             #===================================
108             sub _encode_action {
109             #===================================
110 0     0     my $self = shift;
111 0   0       my $action = shift || '';
112 0           my $orig = shift;
113              
114 0 0         throw( 'Param', "Unrecognised action <$action>" )
115             unless $Actions{$action};
116              
117 0 0         throw( 'Param', "Missing <params> for action <$action>" )
118             unless ref($orig) eq 'HASH';
119              
120 0           my %metadata;
121 0           my $params = {%$orig};
122 0           my $serializer = $self->_serializer;
123              
124 0           for (@Metadata_Keys) {
125 0 0         my $val
    0          
126             = exists $params->{$_} ? delete $params->{$_}
127             : exists $params->{"_$_"} ? delete $params->{"_$_"}
128             : next;
129 0           $metadata{"_$_"} = $val;
130             }
131              
132 0 0 0       throw( 'Param', "Missing required param <index>" )
133             unless $metadata{_index} || $self->_bulk_args->{index};
134 0 0 0       throw( 'Param', "Missing required param <type>" )
135             unless $metadata{_type} || $self->_bulk_args->{type};
136              
137 0           my $source;
138 0 0         if ( $action eq 'update' ) {
    0          
139 0           for (qw(doc upsert doc_as_upsert script params lang)) {
140 0 0         $source->{$_} = delete $params->{$_}
141             if exists $params->{$_};
142             }
143             }
144             elsif ( $action ne 'delete' ) {
145 0   0       $source
146             = delete $params->{_source}
147             || delete $params->{source}
148             || throw(
149             'Param',
150             "Missing <source> for action <$action>: "
151             . $serializer->encode($orig)
152             );
153             }
154              
155 0 0         throw( "Unknown params <"
156             . ( join ',', sort keys %$params )
157             . "> in <$action>: "
158             . $serializer->encode($orig) )
159             if keys %$params;
160              
161 0           return map { $serializer->encode($_) }
  0            
162 0           grep {$_} ( { $action => \%metadata }, $source );
163             }
164              
165             #===================================
166             sub _report {
167             #===================================
168 0     0     my ( $self, $buffer, $results ) = @_;
169 0           my $on_success = $self->on_success;
170 0           my $on_error = $self->on_error;
171 0           my $on_conflict = $self->on_conflict;
172              
173             # assume errors if key not present, bwc
174 0 0         $results->{errors} = 1 unless exists $results->{errors};
175              
176             return
177 0 0 0       unless $on_success
      0        
      0        
178             || ( $results->{errors} and $on_error || $on_conflict );
179              
180 0           my $serializer = $self->_serializer;
181              
182 0           my $j = 0;
183              
184 0           for my $item ( @{ $results->{items} } ) {
  0            
185 0           my ( $action, $result ) = %$item;
186 0           my @args = ($action);
187 0 0         if ( my $error = $result->{error} ) {
188 0 0 0       $on_conflict && $error =~ /$Conflict/
      0        
189             ? $on_conflict->( $action, $result, $j, $1 )
190             : $on_error && $on_error->( $action, $result, $j );
191             }
192             else {
193 0 0         $on_success && $on_success->( $action, $result, $j );
194             }
195 0           $j++;
196             }
197             }
198              
199             #===================================
200             sub clear_buffer {
201             #===================================
202 0     0 0   my $self = shift;
203 0           @{ $self->_buffer } = ();
  0            
204 0           $self->_buffer_size(0);
205 0           $self->_buffer_count(0);
206             }
207              
208             #===================================
209             sub _doc_transformer {
210             #===================================
211 0     0     my ( $self, $params ) = @_;
212              
213 0           my $bulk_args = $self->_bulk_args;
214 0           my %allowed = map { $_ => 1, "_$_" => 1 } ( @Metadata_Keys, 'source' );
  0            
215 0           $allowed{fields} = 1;
216              
217 0 0         delete @allowed{ 'index', '_index' } if $bulk_args->{index};
218 0 0         delete @allowed{ 'type', '_type' } if $bulk_args->{type};
219              
220 0           my $version_type = $params->{version_type};
221 0           my $transform = $params->{transform};
222              
223             return sub {
224 0     0     my %doc = %{ shift() };
  0            
225 0           for ( keys %doc ) {
226 0 0         delete $doc{$_} unless $allowed{$_};
227             }
228              
229 0 0         if ( my $fields = delete $doc{fields} ) {
230 0           for (qw(_routing routing _parent parent)) {
231 0 0         $doc{$_} = $fields->{$_}
232             if exists $fields->{$_};
233             }
234             }
235 0 0         $doc{_version_type} = $version_type if $version_type;
236              
237 0 0         return \%doc unless $transform;
238 0           return $transform->( \%doc );
239 0           };
240             }
241              
242             1;
243              
244             # ABSTRACT: Provides common functionality to L<Elasticseach::Bulk> and L<Elasticsearch::Async::Bulk>
245              
246             __END__
247              
248             =pod
249              
250             =encoding UTF-8
251              
252             =head1 NAME
253              
254             Elasticsearch::Role::Bulk - Provides common functionality to L<Elasticseach::Bulk> and L<Elasticsearch::Async::Bulk>
255              
256             =head1 VERSION
257              
258             version 1.05
259              
260             =head1 AUTHOR
261              
262             Clinton Gormley <drtech@cpan.org>
263              
264             =head1 COPYRIGHT AND LICENSE
265              
266             This software is Copyright (c) 2014 by Elasticsearch BV.
267              
268             This is free software, licensed under:
269              
270             The Apache License, Version 2.0, January 2004
271              
272             =cut