File Coverage

blib/lib/Search/Elasticsearch/Role/Cxn.pm
Criterion Covered Total %
statement 157 208 75.4
branch 50 92 54.3
condition 34 58 58.6
subroutine 31 33 93.9
pod 9 13 69.2
total 281 404 69.5


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::Role::Cxn;
19             $Search::Elasticsearch::Role::Cxn::VERSION = '8.00';
20             our $PRODUCT_CHECK_HEADER = 'x-elastic-product';
21             our $PRODUCT_CHECK_VALUE = 'Elasticsearch';
22              
23 55     55   77609 use Moo::Role;
  55         207759  
  55         340  
24 55     55   19518 use Search::Elasticsearch::Util qw(parse_params throw to_list);
  55         490  
  55         446  
25 55     55   25306 use List::Util qw(min);
  55         279  
  55         5260  
26 55     55   364 use Try::Tiny;
  55         100  
  55         2810  
27 55     55   13038 use URI();
  55         114227  
  55         995  
28 55     55   24669 use IO::Compress::Deflate();
  55         1718044  
  55         1449  
29 55     55   25091 use IO::Uncompress::Inflate();
  55         686518  
  55         1315  
30 55     55   26198 use IO::Compress::Gzip();
  55         302474  
  55         1548  
31 55     55   26269 use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
  55         100953  
  55         5306  
32 55     55   406 use Search::Elasticsearch::Util qw(to_list);
  55         119  
  55         511  
33 55     55   17739 use namespace::clean;
  55         129  
  55         488  
34 55     55   51840 use Net::IP;
  55         2846062  
  55         158371  
35              
36             requires qw(perform_request error_from_text handle);
37              
38             has 'host' => ( is => 'ro', required => 1 );
39             has 'port' => ( is => 'ro', required => 1 );
40             has 'uri' => ( is => 'ro', required => 1 );
41             has 'request_timeout' => ( is => 'ro', default => 30 );
42             has 'ping_timeout' => ( is => 'ro', default => 2 );
43             has 'sniff_timeout' => ( is => 'ro', default => 1 );
44             has 'sniff_request_timeout' => ( is => 'ro', default => 2 );
45             has 'next_ping' => ( is => 'rw', default => 0 );
46             has 'ping_failures' => ( is => 'rw', default => 0 );
47             has 'dead_timeout' => ( is => 'ro', default => 60 );
48             has 'max_dead_timeout' => ( is => 'ro', default => 3600 );
49             has 'serializer' => ( is => 'ro', required => 1 );
50             has 'logger' => ( is => 'ro', required => 1 );
51             has 'handle_args' => ( is => 'ro', default => sub { {} } );
52             has 'default_qs_params' => ( is => 'ro', default => sub { {} } );
53             has 'scheme' => ( is => 'ro' );
54             has 'is_https' => ( is => 'ro' );
55             has 'userinfo' => ( is => 'ro' );
56             has 'max_content_length' => ( is => 'ro' );
57             has 'default_headers' => ( is => 'ro' );
58             has 'deflate' => ( is => 'ro' );
59             has 'gzip' => ( is => 'ro' );
60             has 'ssl_options' => ( is => 'ro', predicate => 'has_ssl_options' );
61             has 'handle' => ( is => 'lazy', clearer => 1 );
62             has '_pid' => ( is => 'rw', default => $$ );
63              
64             my %Code_To_Error = (
65             400 => 'Request',
66             401 => 'Unauthorized',
67             403 => 'Forbidden',
68             404 => 'Missing',
69             408 => 'RequestTimeout',
70             409 => 'Conflict',
71             413 => 'ContentLength',
72             502 => 'BadGateway',
73             503 => 'Unavailable',
74             504 => 'GatewayTimeout'
75             );
76              
77             #===================================
78 611     611 0 2461 sub stringify { shift->uri . '' }
79             #===================================
80              
81             #===================================
82             sub get_user_agent {
83             #===================================
84 176     176 0 1235 return sprintf("elasticsearch-perl/%s (%s; perl %s)", $Search::Elasticsearch::VERSION, $^O, $]);
85             }
86              
87             #===================================
88             sub get_meta_header {
89             #===================================
90 176     176 0 2616 return sprintf("es=%s,pl=%s", $Search::Elasticsearch::VERSION, $^V);
91             }
92              
93              
94             #===================================
95             sub BUILDARGS {
96             #===================================
97 175     175 0 187165 my ( $class, $params ) = parse_params(@_);
98              
99             my $node = $params->{node}
100 175   50     667 || { host => 'localhost', port => '9200' };
101              
102 175 100       525 unless ( ref $node eq 'HASH' ) {
103 170 100       677 $node = "[$node]" if Net::IP::ip_is_ipv6($node);
104 170 100       2650 unless ( $node =~ m{^http(s)?://} ) {
105 124 100       440 $node = ( $params->{use_https} ? 'https://' : 'http://' ) . $node;
106             }
107 170 100 100     657 if ( $params->{port} && $node !~ m{//[^/\[]+:\d+} ) {
108 3         27 $node =~ s{(//[^/]+)}{$1:$params->{port}};
109             }
110 170         877 my $uri = URI->new($node);
111 170         363386 $node = {
112             scheme => $uri->scheme,
113             host => $uri->host,
114             port => $uri->port,
115             path => $uri->path,
116             userinfo => $uri->userinfo
117             };
118             }
119              
120 175   100     21780 my $host = $node->{host} || 'localhost';
121 175   100     1165 my $userinfo = $node->{userinfo} || $params->{userinfo} || '';
122             my $scheme
123 175   66     482 = $node->{scheme} || ( $params->{use_https} ? 'https' : 'http' );
124             my $port
125             = $node->{port}
126             || $params->{port}
127 175   33     492 || ( $scheme eq 'http' ? 80 : 443 );
128 175   100     1022 my $path = $node->{path} || $params->{path_prefix} || '';
129 175         771 $path =~ s{^/?}{/}g;
130 175         612 $path =~ s{/+$}{};
131              
132 175 50       377 my %default_headers = %{ $params->{default_headers} || {} };
  175         884  
133              
134 175 100       570 if ($userinfo) {
135 3         21 require MIME::Base64;
136 3         21 my $auth = MIME::Base64::encode_base64( $userinfo, "" );
137 3         9 chomp $auth;
138 3         12 $default_headers{Authorization} = "Basic $auth";
139             }
140              
141 175 50       624 if ( $params->{gzip} ) {
    100          
142 0         0 $default_headers{'Accept-Encoding'} = "gzip";
143             }
144              
145             elsif ( $params->{deflate} ) {
146 1         3 $default_headers{'Accept-Encoding'} = "deflate";
147             }
148              
149 175         746 $default_headers{'User-Agent'} = $class->get_user_agent();
150              
151             # Add Elastic meta header
152 175         564 $default_headers{'x-elastic-client-meta'} = $class->get_meta_header();
153              
154             # Compatibility header
155 175 0 0     613 if (defined $ENV{ELASTIC_CLIENT_APIVERSIONING} &&
      33        
156             (lc($ENV{ELASTIC_CLIENT_APIVERSIONING}) eq 'true' || $ENV{ELASTIC_CLIENT_APIVERSIONING} eq '1')) {
157 0         0 $default_headers{'Accept'} = 'application/vnd.elasticsearch+json;compatible-with=7';
158 0         0 $default_headers{'Content-Type'} = 'application/vnd.elasticsearch+json; compatible-with=7';
159             }
160              
161 175 50 33     522 if (defined $params->{elastic_cloud_api_key} && defined $params->{token_api}) {
162 0         0 throw( 'Request',
163             "You cannot set elastic_cloud_api_key and token_api together" );
164             }
165              
166             # Elastic cloud API key
167 175 50       437 if (defined $params->{elastic_cloud_api_key}) {
168 0         0 $default_headers{'Authorization'} = sprintf("ApiKey %s", $params->{elastic_cloud_api_key});
169             }
170              
171             # Elasticsearch token API (https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-get-token.html)
172 175 50       421 if (defined $params->{token_api}) {
173 0         0 $default_headers{'Authorization'} = sprintf("Bearer %s", $params->{token_api});
174             }
175              
176             # Elasticsearch
177 175         375 $params->{scheme} = $scheme;
178 175         402 $params->{is_https} = $scheme eq 'https';
179 175         516 $params->{host} = $host;
180 175         392 $params->{port} = $port;
181 175         344 $params->{path} = $path;
182 175         322 $params->{userinfo} = $userinfo;
183 175 100       549 $host = "[$host]" if Net::IP::ip_is_ipv6($host);
184 175         1946 $params->{uri} = URI->new("$scheme://$host:$port$path");
185 175         11812 $params->{default_headers} = \%default_headers;
186              
187 175         3610 return $params;
188             }
189              
190             #===================================
191             before 'handle' => sub {
192             #===================================
193             my $self = shift;
194             if ( $$ != $self->_pid ) {
195             $self->clear_handle;
196             $self->_pid($$);
197             }
198             };
199              
200             #===================================
201 182     182 1 905 sub is_live { !shift->next_ping }
202 128     128 1 693 sub is_dead { !!shift->next_ping }
203             #===================================
204              
205             #===================================
206             sub mark_live {
207             #===================================
208 149     149 1 1360 my $self = shift;
209 149         336 $self->ping_failures(0);
210 149         330 $self->next_ping(0);
211             }
212              
213             #===================================
214             sub mark_dead {
215             #===================================
216 143     143 1 7187 my $self = shift;
217 143         370 my $fails = $self->ping_failures;
218 143         332 $self->ping_failures( $fails + 1 );
219              
220 143         626 my $timeout
221             = min( $self->dead_timeout * 2**$fails, $self->max_dead_timeout );
222 143         337 my $next = $self->next_ping( time() + $timeout );
223              
224 143         401 $self->logger->infof( 'Marking [%s] as dead. Next ping at: %s',
225             $self->stringify, scalar localtime($next) );
226              
227             }
228              
229             #===================================
230             sub force_ping {
231             #===================================
232 121     121 1 298 my $self = shift;
233 121         355 $self->ping_failures(0);
234 121         1180 $self->next_ping(-1);
235             }
236              
237             #===================================
238             sub pings_ok {
239             #===================================
240 47     47 1 70 my $self = shift;
241 47         137 $self->logger->infof( 'Pinging [%s]', $self->stringify );
242             return try {
243 47     47   3670 $self->perform_request(
244             { method => 'HEAD',
245             path => '/',
246             timeout => $self->ping_timeout,
247             }
248             );
249 33         159 $self->logger->infof( 'Marking [%s] as live', $self->stringify );
250 33         1412 $self->mark_live;
251 33         73 1;
252             }
253             catch {
254 14     14   627 $self->logger->debug("$_");
255 14         403 $self->mark_dead;
256 14         1229 0;
257 47         2356 };
258             }
259              
260             #===================================
261             sub sniff {
262             #===================================
263 38     38 1 63 my $self = shift;
264 38         135 $self->logger->infof( 'Sniffing [%s]', $self->stringify );
265             return try {
266             $self->perform_request(
267             { method => 'GET',
268             path => '/_nodes/http',
269             qs => { timeout => $self->sniff_timeout . 's' },
270             timeout => $self->sniff_request_timeout,
271             }
272 38     38   3604 )->{nodes};
273             }
274             catch {
275 13     13   498 $self->logger->debug($_);
276 13         528 return;
277 38         2210 };
278             }
279              
280             #===================================
281             sub build_uri {
282             #===================================
283 5     5 1 15 my ( $self, $params ) = @_;
284 5         31 my $uri = $self->uri->clone;
285 5         41 $uri->path( $uri->path . $params->{path} );
286 5 100       241 my %qs = ( %{ $self->default_qs_params }, %{ $params->{qs} || {} } );
  5         18  
  5         30  
287 5         27 $uri->query_form( \%qs );
288 5         379 return $uri;
289             }
290              
291             #===================================
292             before 'perform_request' => sub {
293             #===================================
294             my ( $self, $params ) = @_;
295             return unless defined $params->{data};
296              
297             $self->_compress_body($params);
298              
299             my $max = $self->max_content_length
300             or return;
301              
302             return if length( $params->{data} ) < $max;
303              
304             $self->logger->throw_error( 'ContentLength',
305             "Body is longer than max_content_length ($max)",
306             );
307             };
308              
309             #===================================
310             sub _compress_body {
311             #===================================
312 0     0   0 my ( $self, $params ) = @_;
313 0         0 my $output;
314 0 0       0 if ( $self->gzip ) {
    0          
315 0 0       0 IO::Compress::Gzip::gzip( \( $params->{data} ), \$output )
316             or throw( 'Request',
317             "Couldn't gzip request: $IO::Compress::Gzip::GzipError" );
318 0         0 $params->{data} = $output;
319 0         0 $params->{encoding} = 'gzip';
320             }
321             elsif ( $self->deflate ) {
322 0 0       0 IO::Compress::Deflate::deflate( \( $params->{data} ), \$output )
323             or throw( 'Request',
324             "Couldn't deflate request: $IO::Compress::Deflate::DeflateError" );
325 0         0 $params->{data} = $output;
326 0         0 $params->{encoding} = 'deflate';
327             }
328             }
329              
330             #===================================
331             sub _decompress_body {
332             #===================================
333 239     239   460 my ( $self, $body_ref, $headers ) = @_;
334 239 50       584 if ( my $encoding = $headers->{'content-encoding'} ) {
335 0         0 my $output;
336 0 0       0 if ( $encoding eq 'gzip' ) {
    0          
337 0 0       0 IO::Uncompress::Gunzip::gunzip( $body_ref, \$output )
338             or throw(
339             'Request',
340             "Couldn't gunzip response: $IO::Uncompress::Gunzip::GunzipError"
341             );
342             }
343             elsif ( $encoding eq 'deflate' ) {
344 0 0       0 IO::Uncompress::Inflate::inflate( $body_ref, \$output,
345             Transparent => 0 )
346             or throw(
347             'Request',
348             "Couldn't inflate response: $IO::Uncompress::Inflate::InflateError"
349             );
350             }
351             else {
352 0         0 throw( 'Request', "Unknown content-encoding: $encoding" );
353             }
354 0         0 ${$body_ref} = $output;
  0         0  
355             }
356             }
357              
358             #===================================
359             sub process_response {
360             #===================================
361 240     240 1 29316 my ( $self, $params, $code, $msg, $body, $headers ) = @_;
362              
363             # Product check
364 240 100 66     1055 if ( $code >= 200 and $code < 300 ) {
365 178   100     491 my $product = $headers->{$PRODUCT_CHECK_HEADER} // '';
366 178 100       482 if ($product ne $PRODUCT_CHECK_VALUE) {
367 1         4 throw( "ProductCheck", "The client noticed that the server is not Elasticsearch and we do not support this unknown product" );
368             }
369             }
370              
371 239         676 $self->_decompress_body( \$body, $headers );
372              
373 239   100     977 my ($mime_type) = split /\s*;\s*/, ( $headers->{'content-type'} || '' );
374              
375 239   100     808 my $is_encoded = $mime_type && $mime_type ne 'text/plain';
376              
377             # Deprecation warnings
378 239 50       557 if ( my $warnings = $headers->{warning} ) {
379 0         0 my $warning_string = _parse_warnings($warnings);
380 0         0 my %temp = (%$params);
381 0         0 delete $temp{data};
382 0         0 $self->logger->deprecation( $warning_string, \%temp );
383             }
384              
385             # Request is successful
386 239 100 66     846 if ( $code >= 200 and $code <= 209 ) {
387 177 100 100     629 if ( defined $body and length $body ) {
388 141 100       720 $body = $self->serializer->decode($body)
389             if $is_encoded;
390 141         1838 return $code, $body;
391             }
392 36 100       155 return ( $code, 1 ) if $params->{method} eq 'HEAD';
393 2         7 return ( $code, '' );
394             }
395              
396             # Check if the error should be ignored
397 62         254 my @ignore = to_list( $params->{ignore} );
398 62 100       216 push @ignore, 404 if $params->{method} eq 'HEAD';
399 62 100       181 return ($code) if grep { $_ eq $code } @ignore;
  12         43  
400              
401             # Determine error type
402 60         142 my $error_type = $Code_To_Error{$code};
403 60 100       155 unless ($error_type) {
404 49 50 33     157 if ( defined $body and length $body ) {
405 0         0 $msg = $body;
406 0         0 $body = undef;
407             }
408 49         154 $error_type = $self->error_from_text( $code, $msg );
409             }
410              
411 60 50       230 delete $params->{data} if $params->{body};
412 60         201 my %error_args = ( status_code => $code, request => $params );
413              
414             # Extract error message from the body, if present
415              
416 60 100       261 if ( $body = $self->serializer->decode($body) ) {
417 2         23 $error_args{body} = $body;
418 2   33     7 $msg = $self->_munge_elasticsearch_exception($body) || $msg;
419              
420 2 50 33     7 $error_args{current_version} = $1
421             if $error_type eq 'Conflict'
422             and $msg =~ /: version conflict, current (?:version )?\[(\d+)\]/;
423             }
424 60   33     216 $msg ||= $error_type;
425              
426 60         122 chomp $msg;
427 60         173 throw( $error_type, "[" . $self->stringify . "]-[$code] $msg",
428             \%error_args );
429             }
430              
431             #===================================
432             sub _parse_warnings {
433             #===================================
434 0 0   0   0 my @warnings = ref $_[0] eq 'ARRAY' ? @{ shift() } : shift();
  0         0  
435 0         0 my @str;
436 0         0 for (@warnings) {
437 0 0       0 if ( $_ =~ /^\d+\s+\S+\s+"((?:\\"|[^"])+)"/ ) {
438 0         0 my $msg = $1;
439 0         0 $msg =~ s/\\"/"/g, push @str, $msg;
440             }
441             else {
442 0         0 push @str, $_;
443             }
444             }
445 0         0 return join "; ", @str;
446             }
447              
448             #===================================
449             sub _munge_elasticsearch_exception {
450             #===================================
451 2     2   4 my ( $self, $body ) = @_;
452 2 50       6 return $body unless ref $body eq 'HASH';
453 2   50     5 my $error = $body->{error} || return;
454 2 50       9 return $error unless ref $error eq 'HASH';
455              
456 0   0       my $root_causes = $error->{root_cause} || [];
457 0 0         unless (@$root_causes) {
458 0 0         my $msg = "[" . $error->{type} . "] " if $error->{type};
459 0 0         $msg .= $error->{reason} if $error->{reason};
460 0           return $msg;
461             }
462              
463 0           my $json = $self->serializer;
464 0           my @msgs;
465 0           for (@$root_causes) {
466 0           my %cause = (%$_);
467             my $msg
468 0           = "[" . ( delete $cause{type} ) . "] " . ( delete $cause{reason} );
469 0 0         if ( keys %cause ) {
470 0           $msg .= ", with: " . $json->encode( \%cause );
471             }
472 0           push @msgs, $msg;
473             }
474 0           return ( join ", ", @msgs );
475             }
476              
477             1;
478              
479             # ABSTRACT: Provides common functionality to HTTP Cxn implementations
480              
481             __END__