File Coverage

blib/lib/Search/Elasticsearch/Role/Cxn.pm
Criterion Covered Total %
statement 150 198 75.7
branch 43 82 52.4
condition 29 50 58.0
subroutine 31 33 93.9
pod 9 13 69.2
total 262 376 69.6


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::Role::Cxn;
19             $Search::Elasticsearch::Role::Cxn::VERSION = '7.717';
20 55     55   259180 use Moo::Role;
  55         6287  
  55         309  
21 55     55   16833 use Search::Elasticsearch::Util qw(parse_params throw to_list);
  55         126  
  55         738  
22 55     55   21420 use List::Util qw(min);
  55         103  
  55         4916  
23 55     55   317 use Try::Tiny;
  55         93  
  55         2296  
24 55     55   10656 use URI();
  55         91657  
  55         929  
25 55     55   22434 use IO::Compress::Deflate();
  55         1432921  
  55         1209  
26 55     55   21869 use IO::Uncompress::Inflate();
  55         573409  
  55         1180  
27 55     55   23534 use IO::Compress::Gzip();
  55         249939  
  55         1401  
28 55     55   22490 use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
  55         84575  
  55         4997  
29 55     55   394 use Search::Elasticsearch::Util qw(to_list);
  55         113  
  55         489  
30 55     55   15506 use namespace::clean;
  55         108  
  55         520  
31 55     55   44177 use Net::IP;
  55         2323694  
  55         124193  
32              
33             requires qw(perform_request error_from_text handle);
34              
35             has 'host' => ( is => 'ro', required => 1 );
36             has 'port' => ( is => 'ro', required => 1 );
37             has 'uri' => ( is => 'ro', required => 1 );
38             has 'request_timeout' => ( is => 'ro', default => 30 );
39             has 'ping_timeout' => ( is => 'ro', default => 2 );
40             has 'sniff_timeout' => ( is => 'ro', default => 1 );
41             has 'sniff_request_timeout' => ( is => 'ro', default => 2 );
42             has 'next_ping' => ( is => 'rw', default => 0 );
43             has 'ping_failures' => ( is => 'rw', default => 0 );
44             has 'dead_timeout' => ( is => 'ro', default => 60 );
45             has 'max_dead_timeout' => ( is => 'ro', default => 3600 );
46             has 'serializer' => ( is => 'ro', required => 1 );
47             has 'logger' => ( is => 'ro', required => 1 );
48             has 'handle_args' => ( is => 'ro', default => sub { {} } );
49             has 'default_qs_params' => ( is => 'ro', default => sub { {} } );
50             has 'scheme' => ( is => 'ro' );
51             has 'is_https' => ( is => 'ro' );
52             has 'userinfo' => ( is => 'ro' );
53             has 'max_content_length' => ( is => 'ro' );
54             has 'default_headers' => ( is => 'ro' );
55             has 'deflate' => ( is => 'ro' );
56             has 'gzip' => ( is => 'ro' );
57             has 'ssl_options' => ( is => 'ro', predicate => 'has_ssl_options' );
58             has 'handle' => ( is => 'lazy', clearer => 1 );
59             has '_pid' => ( is => 'rw', default => $$ );
60              
61             my %Code_To_Error = (
62             400 => 'Request',
63             401 => 'Unauthorized',
64             403 => 'Forbidden',
65             404 => 'Missing',
66             408 => 'RequestTimeout',
67             409 => 'Conflict',
68             413 => 'ContentLength',
69             502 => 'BadGateway',
70             503 => 'Unavailable',
71             504 => 'GatewayTimeout'
72             );
73              
74             #===================================
75 611     611 0 2092 sub stringify { shift->uri . '' }
76             #===================================
77              
78             #===================================
79             sub get_user_agent {
80             #===================================
81 176     176 0 1129 return sprintf("elasticsearch-perl/%s (%s; perl %s)", $Search::Elasticsearch::VERSION, $^O, $]);
82             }
83              
84             #===================================
85             sub get_meta_header {
86             #===================================
87 176     176 0 2174 return sprintf("es=%s,pl=%s", $Search::Elasticsearch::VERSION, $^V);
88             }
89              
90              
91             #===================================
92             sub BUILDARGS {
93             #===================================
94 175     175 0 154513 my ( $class, $params ) = parse_params(@_);
95              
96             my $node = $params->{node}
97 175   50     543 || { host => 'localhost', port => '9200' };
98              
99 175 100       443 unless ( ref $node eq 'HASH' ) {
100 170 100       586 $node = "[$node]" if Net::IP::ip_is_ipv6($node);
101 170 100       2177 unless ( $node =~ m{^http(s)?://} ) {
102 124 100       362 $node = ( $params->{use_https} ? 'https://' : 'http://' ) . $node;
103             }
104 170 100 100     527 if ( $params->{port} && $node !~ m{//[^/\[]+:\d+} ) {
105 3         22 $node =~ s{(//[^/]+)}{$1:$params->{port}};
106             }
107 170         758 my $uri = URI->new($node);
108 170         296176 $node = {
109             scheme => $uri->scheme,
110             host => $uri->host,
111             port => $uri->port,
112             path => $uri->path,
113             userinfo => $uri->userinfo
114             };
115             }
116              
117 175   100     17492 my $host = $node->{host} || 'localhost';
118 175   100     1007 my $userinfo = $node->{userinfo} || $params->{userinfo} || '';
119             my $scheme
120 175   66     425 = $node->{scheme} || ( $params->{use_https} ? 'https' : 'http' );
121             my $port
122             = $node->{port}
123             || $params->{port}
124 175   33     411 || ( $scheme eq 'http' ? 80 : 443 );
125 175   100     895 my $path = $node->{path} || $params->{path_prefix} || '';
126 175         627 $path =~ s{^/?}{/}g;
127 175         507 $path =~ s{/+$}{};
128              
129 175 50       263 my %default_headers = %{ $params->{default_headers} || {} };
  175         743  
130              
131 175 100       508 if ($userinfo) {
132 3         17 require MIME::Base64;
133 3         15 my $auth = MIME::Base64::encode_base64( $userinfo, "" );
134 3         7 chomp $auth;
135 3         9 $default_headers{Authorization} = "Basic $auth";
136             }
137              
138 175 50       579 if ( $params->{gzip} ) {
    100          
139 0         0 $default_headers{'Accept-Encoding'} = "gzip";
140             }
141              
142             elsif ( $params->{deflate} ) {
143 1         3 $default_headers{'Accept-Encoding'} = "deflate";
144             }
145              
146 175         695 $default_headers{'User-Agent'} = $class->get_user_agent();
147              
148             # Add Elastic meta header
149 175         483 $default_headers{'x-elastic-client-meta'} = $class->get_meta_header();
150              
151             # Compatibility header
152 175 0 0     498 if (defined $ENV{ELASTIC_CLIENT_APIVERSIONING} &&
      33        
153             (lc($ENV{ELASTIC_CLIENT_APIVERSIONING}) eq 'true' || $ENV{ELASTIC_CLIENT_APIVERSIONING} eq '1')) {
154 0         0 $default_headers{'Accept'} = 'application/vnd.elasticsearch+json;compatible-with=7';
155 0         0 $default_headers{'Content-Type'} = 'application/vnd.elasticsearch+json; compatible-with=7';
156             }
157              
158 175         315 $params->{scheme} = $scheme;
159 175         365 $params->{is_https} = $scheme eq 'https';
160 175         298 $params->{host} = $host;
161 175         361 $params->{port} = $port;
162 175         270 $params->{path} = $path;
163 175         282 $params->{userinfo} = $userinfo;
164 175 100       505 $host = "[$host]" if Net::IP::ip_is_ipv6($host);
165 175         1572 $params->{uri} = URI->new("$scheme://$host:$port$path");
166 175         9489 $params->{default_headers} = \%default_headers;
167              
168 175         2862 return $params;
169             }
170              
171             #===================================
172             before 'handle' => sub {
173             #===================================
174             my $self = shift;
175             if ( $$ != $self->_pid ) {
176             $self->clear_handle;
177             $self->_pid($$);
178             }
179             };
180              
181             #===================================
182 182     182 1 806 sub is_live { !shift->next_ping }
183 128     128 1 624 sub is_dead { !!shift->next_ping }
184             #===================================
185              
186             #===================================
187             sub mark_live {
188             #===================================
189 149     149 1 1458 my $self = shift;
190 149         295 $self->ping_failures(0);
191 149         278 $self->next_ping(0);
192             }
193              
194             #===================================
195             sub mark_dead {
196             #===================================
197 143     143 1 5657 my $self = shift;
198 143         278 my $fails = $self->ping_failures;
199 143         277 $self->ping_failures( $fails + 1 );
200              
201 143         527 my $timeout
202             = min( $self->dead_timeout * 2**$fails, $self->max_dead_timeout );
203 143         274 my $next = $self->next_ping( time() + $timeout );
204              
205 143         313 $self->logger->infof( 'Marking [%s] as dead. Next ping at: %s',
206             $self->stringify, scalar localtime($next) );
207              
208             }
209              
210             #===================================
211             sub force_ping {
212             #===================================
213 121     121 1 249 my $self = shift;
214 121         306 $self->ping_failures(0);
215 121         770 $self->next_ping(-1);
216             }
217              
218             #===================================
219             sub pings_ok {
220             #===================================
221 47     47 1 62 my $self = shift;
222 47         124 $self->logger->infof( 'Pinging [%s]', $self->stringify );
223             return try {
224 47     47   3287 $self->perform_request(
225             { method => 'HEAD',
226             path => '/',
227             timeout => $self->ping_timeout,
228             }
229             );
230 33         126 $self->logger->infof( 'Marking [%s] as live', $self->stringify );
231 33         1170 $self->mark_live;
232 33         64 1;
233             }
234             catch {
235 14     14   579 $self->logger->debug("$_");
236 14         322 $self->mark_dead;
237 14         1096 0;
238 47         2065 };
239             }
240              
241             #===================================
242             sub sniff {
243             #===================================
244 38     38 1 59 my $self = shift;
245 38         101 $self->logger->infof( 'Sniffing [%s]', $self->stringify );
246             return try {
247             $self->perform_request(
248             { method => 'GET',
249             path => '/_nodes/http',
250             qs => { timeout => $self->sniff_timeout . 's' },
251             timeout => $self->sniff_request_timeout,
252             }
253 38     38   2753 )->{nodes};
254             }
255             catch {
256 13     13   370 $self->logger->debug($_);
257 13         394 return;
258 38         1640 };
259             }
260              
261             #===================================
262             sub build_uri {
263             #===================================
264 5     5 1 12 my ( $self, $params ) = @_;
265 5         25 my $uri = $self->uri->clone;
266 5         32 $uri->path( $uri->path . $params->{path} );
267 5 100       208 my %qs = ( %{ $self->default_qs_params }, %{ $params->{qs} || {} } );
  5         14  
  5         20  
268 5         45 $uri->query_form( \%qs );
269 5         288 return $uri;
270             }
271              
272             #===================================
273             before 'perform_request' => sub {
274             #===================================
275             my ( $self, $params ) = @_;
276             return unless defined $params->{data};
277              
278             $self->_compress_body($params);
279              
280             my $max = $self->max_content_length
281             or return;
282              
283             return if length( $params->{data} ) < $max;
284              
285             $self->logger->throw_error( 'ContentLength',
286             "Body is longer than max_content_length ($max)",
287             );
288             };
289              
290             #===================================
291             sub _compress_body {
292             #===================================
293 0     0   0 my ( $self, $params ) = @_;
294 0         0 my $output;
295 0 0       0 if ( $self->gzip ) {
    0          
296 0 0       0 IO::Compress::Gzip::gzip( \( $params->{data} ), \$output )
297             or throw( 'Request',
298             "Couldn't gzip request: $IO::Compress::Gzip::GzipError" );
299 0         0 $params->{data} = $output;
300 0         0 $params->{encoding} = 'gzip';
301             }
302             elsif ( $self->deflate ) {
303 0 0       0 IO::Compress::Deflate::deflate( \( $params->{data} ), \$output )
304             or throw( 'Request',
305             "Couldn't deflate request: $IO::Compress::Deflate::DeflateError" );
306 0         0 $params->{data} = $output;
307 0         0 $params->{encoding} = 'deflate';
308             }
309             }
310              
311             #===================================
312             sub _decompress_body {
313             #===================================
314 238     238   347 my ( $self, $body_ref, $headers ) = @_;
315 238 50       543 if ( my $encoding = $headers->{'content-encoding'} ) {
316 0         0 my $output;
317 0 0       0 if ( $encoding eq 'gzip' ) {
    0          
318 0 0       0 IO::Uncompress::Gunzip::gunzip( $body_ref, \$output )
319             or throw(
320             'Request',
321             "Couldn't gunzip response: $IO::Uncompress::Gunzip::GunzipError"
322             );
323             }
324             elsif ( $encoding eq 'deflate' ) {
325 0 0       0 IO::Uncompress::Inflate::inflate( $body_ref, \$output,
326             Transparent => 0 )
327             or throw(
328             'Request',
329             "Couldn't inflate response: $IO::Uncompress::Inflate::InflateError"
330             );
331             }
332             else {
333 0         0 throw( 'Request', "Unknown content-encoding: $encoding" );
334             }
335 0         0 ${$body_ref} = $output;
  0         0  
336             }
337             }
338              
339             #===================================
340             sub process_response {
341             #===================================
342 238     238 1 25587 my ( $self, $params, $code, $msg, $body, $headers ) = @_;
343 238         621 $self->_decompress_body( \$body, $headers );
344              
345 238   100     775 my ($mime_type) = split /\s*;\s*/, ( $headers->{'content-type'} || '' );
346              
347 238   100     750 my $is_encoded = $mime_type && $mime_type ne 'text/plain';
348              
349             # Deprecation warnings
350 238 50       447 if ( my $warnings = $headers->{warning} ) {
351 0         0 my $warning_string = _parse_warnings($warnings);
352 0         0 my %temp = (%$params);
353 0         0 delete $temp{data};
354 0         0 $self->logger->deprecation( $warning_string, \%temp );
355             }
356              
357             # Request is successful
358              
359 238 100 66     693 if ( $code >= 200 and $code <= 209 ) {
360 176 100 100     619 if ( defined $body and length $body ) {
361 141 100       611 $body = $self->serializer->decode($body)
362             if $is_encoded;
363 141         1528 return $code, $body;
364             }
365 35 100       114 return ( $code, 1 ) if $params->{method} eq 'HEAD';
366 1         4 return ( $code, '' );
367             }
368              
369             # Check if the error should be ignored
370 62         199 my @ignore = to_list( $params->{ignore} );
371 62 100       166 push @ignore, 404 if $params->{method} eq 'HEAD';
372 62 100       137 return ($code) if grep { $_ eq $code } @ignore;
  12         38  
373              
374             # Determine error type
375 60         128 my $error_type = $Code_To_Error{$code};
376 60 100       122 unless ($error_type) {
377 49 50 33     128 if ( defined $body and length $body ) {
378 0         0 $msg = $body;
379 0         0 $body = undef;
380             }
381 49         136 $error_type = $self->error_from_text( $code, $msg );
382             }
383              
384 60 50       216 delete $params->{data} if $params->{body};
385 60         178 my %error_args = ( status_code => $code, request => $params );
386              
387             # Extract error message from the body, if present
388              
389 60 100       237 if ( $body = $self->serializer->decode($body) ) {
390 2         21 $error_args{body} = $body;
391 2   33     5 $msg = $self->_munge_elasticsearch_exception($body) || $msg;
392              
393 2 50 33     7 $error_args{current_version} = $1
394             if $error_type eq 'Conflict'
395             and $msg =~ /: version conflict, current (?:version )?\[(\d+)\]/;
396             }
397 60   33     135 $msg ||= $error_type;
398              
399 60         101 chomp $msg;
400 60         153 throw( $error_type, "[" . $self->stringify . "]-[$code] $msg",
401             \%error_args );
402             }
403              
404             #===================================
405             sub _parse_warnings {
406             #===================================
407 0 0   0   0 my @warnings = ref $_[0] eq 'ARRAY' ? @{ shift() } : shift();
  0         0  
408 0         0 my @str;
409 0         0 for (@warnings) {
410 0 0       0 if ( $_ =~ /^\d+\s+\S+\s+"((?:\\"|[^"])+)"/ ) {
411 0         0 my $msg = $1;
412 0         0 $msg =~ s/\\"/"/g, push @str, $msg;
413             }
414             else {
415 0         0 push @str, $_;
416             }
417             }
418 0         0 return join "; ", @str;
419             }
420              
421             #===================================
422             sub _munge_elasticsearch_exception {
423             #===================================
424 2     2   5 my ( $self, $body ) = @_;
425 2 50       5 return $body unless ref $body eq 'HASH';
426 2   50     10 my $error = $body->{error} || return;
427 2 50       6 return $error unless ref $error eq 'HASH';
428              
429 0   0       my $root_causes = $error->{root_cause} || [];
430 0 0         unless (@$root_causes) {
431 0 0         my $msg = "[" . $error->{type} . "] " if $error->{type};
432 0 0         $msg .= $error->{reason} if $error->{reason};
433 0           return $msg;
434             }
435              
436 0           my $json = $self->serializer;
437 0           my @msgs;
438 0           for (@$root_causes) {
439 0           my %cause = (%$_);
440             my $msg
441 0           = "[" . ( delete $cause{type} ) . "] " . ( delete $cause{reason} );
442 0 0         if ( keys %cause ) {
443 0           $msg .= ", with: " . $json->encode( \%cause );
444             }
445 0           push @msgs, $msg;
446             }
447 0           return ( join ", ", @msgs );
448             }
449              
450             1;
451              
452             # ABSTRACT: Provides common functionality to HTTP Cxn implementations
453              
454             __END__