File Coverage

blib/lib/Search/Elasticsearch/Role/Cxn.pm
Criterion Covered Total %
statement 147 193 76.1
branch 43 80 53.7
condition 28 44 63.6
subroutine 30 32 93.7
pod 9 12 75.0
total 257 361 71.1


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::Role::Cxn;
19             $Search::Elasticsearch::Role::Cxn::VERSION = '7.715';
20 55     55   320874 use Moo::Role;
  55         7726  
  55         377  
21 55     55   20475 use Search::Elasticsearch::Util qw(parse_params throw to_list);
  55         136  
  55         906  
22 55     55   26332 use List::Util qw(min);
  55         121  
  55         5693  
23 55     55   411 use Try::Tiny;
  55         104  
  55         3020  
24 55     55   13526 use URI();
  55         112486  
  55         1101  
25 55     55   27534 use IO::Compress::Deflate();
  55         1818884  
  55         1497  
26 55     55   27654 use IO::Uncompress::Inflate();
  55         733102  
  55         1495  
27 55     55   29797 use IO::Compress::Gzip();
  55         320946  
  55         1778  
28 55     55   28699 use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
  55         107749  
  55         6025  
29 55     55   481 use Search::Elasticsearch::Util qw(to_list);
  55         133  
  55         581  
30 55     55   18444 use namespace::clean;
  55         131  
  55         587  
31 55     55   55344 use Net::IP;
  55         2973384  
  55         153050  
32              
33             requires qw(perform_request error_from_text handle);
34              
35             has 'host' => ( is => 'ro', required => 1 );
36             has 'port' => ( is => 'ro', required => 1 );
37             has 'uri' => ( is => 'ro', required => 1 );
38             has 'request_timeout' => ( is => 'ro', default => 30 );
39             has 'ping_timeout' => ( is => 'ro', default => 2 );
40             has 'sniff_timeout' => ( is => 'ro', default => 1 );
41             has 'sniff_request_timeout' => ( is => 'ro', default => 2 );
42             has 'next_ping' => ( is => 'rw', default => 0 );
43             has 'ping_failures' => ( is => 'rw', default => 0 );
44             has 'dead_timeout' => ( is => 'ro', default => 60 );
45             has 'max_dead_timeout' => ( is => 'ro', default => 3600 );
46             has 'serializer' => ( is => 'ro', required => 1 );
47             has 'logger' => ( is => 'ro', required => 1 );
48             has 'handle_args' => ( is => 'ro', default => sub { {} } );
49             has 'default_qs_params' => ( is => 'ro', default => sub { {} } );
50             has 'scheme' => ( is => 'ro' );
51             has 'is_https' => ( is => 'ro' );
52             has 'userinfo' => ( is => 'ro' );
53             has 'max_content_length' => ( is => 'ro' );
54             has 'default_headers' => ( is => 'ro' );
55             has 'deflate' => ( is => 'ro' );
56             has 'gzip' => ( is => 'ro' );
57             has 'ssl_options' => ( is => 'ro', predicate => 'has_ssl_options' );
58             has 'handle' => ( is => 'lazy', clearer => 1 );
59             has '_pid' => ( is => 'rw', default => $$ );
60              
61             my %Code_To_Error = (
62             400 => 'Request',
63             401 => 'Unauthorized',
64             403 => 'Forbidden',
65             404 => 'Missing',
66             408 => 'RequestTimeout',
67             409 => 'Conflict',
68             413 => 'ContentLength',
69             502 => 'BadGateway',
70             503 => 'Unavailable',
71             504 => 'GatewayTimeout'
72             );
73              
74             #===================================
75 611     611 0 2732 sub stringify { shift->uri . '' }
76             #===================================
77              
78             #===================================
79             sub get_user_agent {
80             #===================================
81 176     176 0 1443 return sprintf("elasticsearch-perl/%s (%s; perl %s)", $Search::Elasticsearch::VERSION, $^O, $]);
82             }
83              
84             #===================================
85             sub BUILDARGS {
86             #===================================
87 175     175 0 191188 my ( $class, $params ) = parse_params(@_);
88              
89             my $node = $params->{node}
90 175   50     790 || { host => 'localhost', port => '9200' };
91              
92 175 100       579 unless ( ref $node eq 'HASH' ) {
93 170 100       806 $node = "[$node]" if Net::IP::ip_is_ipv6($node);
94 170 100       2904 unless ( $node =~ m{^http(s)?://} ) {
95 124 100       461 $node = ( $params->{use_https} ? 'https://' : 'http://' ) . $node;
96             }
97 170 100 100     654 if ( $params->{port} && $node !~ m{//[^/\[]+:\d+} ) {
98 3         27 $node =~ s{(//[^/]+)}{$1:$params->{port}};
99             }
100 170         1026 my $uri = URI->new($node);
101 170         380920 $node = {
102             scheme => $uri->scheme,
103             host => $uri->host,
104             port => $uri->port,
105             path => $uri->path,
106             userinfo => $uri->userinfo
107             };
108             }
109              
110 175   100     22736 my $host = $node->{host} || 'localhost';
111 175   100     1272 my $userinfo = $node->{userinfo} || $params->{userinfo} || '';
112             my $scheme
113 175   66     561 = $node->{scheme} || ( $params->{use_https} ? 'https' : 'http' );
114             my $port
115             = $node->{port}
116             || $params->{port}
117 175   33     629 || ( $scheme eq 'http' ? 80 : 443 );
118 175   100     1121 my $path = $node->{path} || $params->{path_prefix} || '';
119 175         812 $path =~ s{^/?}{/}g;
120 175         651 $path =~ s{/+$}{};
121              
122 175 50       362 my %default_headers = %{ $params->{default_headers} || {} };
  175         956  
123              
124 175 100       635 if ($userinfo) {
125 3         27 require MIME::Base64;
126 3         22 my $auth = MIME::Base64::encode_base64( $userinfo, "" );
127 3         12 chomp $auth;
128 3         11 $default_headers{Authorization} = "Basic $auth";
129             }
130              
131 175 50       803 if ( $params->{gzip} ) {
    100          
132 0         0 $default_headers{'Accept-Encoding'} = "gzip";
133             }
134              
135             elsif ( $params->{deflate} ) {
136 1         3 $default_headers{'Accept-Encoding'} = "deflate";
137             }
138              
139 175         884 $default_headers{'User-Agent'} = $class->get_user_agent();
140              
141 175         478 $params->{scheme} = $scheme;
142 175         501 $params->{is_https} = $scheme eq 'https';
143 175         729 $params->{host} = $host;
144 175         425 $params->{port} = $port;
145 175         405 $params->{path} = $path;
146 175         348 $params->{userinfo} = $userinfo;
147 175 100       632 $host = "[$host]" if Net::IP::ip_is_ipv6($host);
148 175         2013 $params->{uri} = URI->new("$scheme://$host:$port$path");
149 175         11588 $params->{default_headers} = \%default_headers;
150              
151 175         3569 return $params;
152             }
153              
154             #===================================
155             before 'handle' => sub {
156             #===================================
157             my $self = shift;
158             if ( $$ != $self->_pid ) {
159             $self->clear_handle;
160             $self->_pid($$);
161             }
162             };
163              
164             #===================================
165 182     182 1 934 sub is_live { !shift->next_ping }
166 128     128 1 745 sub is_dead { !!shift->next_ping }
167             #===================================
168              
169             #===================================
170             sub mark_live {
171             #===================================
172 149     149 1 1250 my $self = shift;
173 149         355 $self->ping_failures(0);
174 149         341 $self->next_ping(0);
175             }
176              
177             #===================================
178             sub mark_dead {
179             #===================================
180 143     143 1 6281 my $self = shift;
181 143         331 my $fails = $self->ping_failures;
182 143         303 $self->ping_failures( $fails + 1 );
183              
184 143         603 my $timeout
185             = min( $self->dead_timeout * 2**$fails, $self->max_dead_timeout );
186 143         324 my $next = $self->next_ping( time() + $timeout );
187              
188 143         337 $self->logger->infof( 'Marking [%s] as dead. Next ping at: %s',
189             $self->stringify, scalar localtime($next) );
190              
191             }
192              
193             #===================================
194             sub force_ping {
195             #===================================
196 121     121 1 300 my $self = shift;
197 121         394 $self->ping_failures(0);
198 121         1056 $self->next_ping(-1);
199             }
200              
201             #===================================
202             sub pings_ok {
203             #===================================
204 47     47 1 72 my $self = shift;
205 47         158 $self->logger->infof( 'Pinging [%s]', $self->stringify );
206             return try {
207 47     47   4170 $self->perform_request(
208             { method => 'HEAD',
209             path => '/',
210             timeout => $self->ping_timeout,
211             }
212             );
213 33         171 $self->logger->infof( 'Marking [%s] as live', $self->stringify );
214 33         1546 $self->mark_live;
215 33         86 1;
216             }
217             catch {
218 14     14   667 $self->logger->debug("$_");
219 14         416 $self->mark_dead;
220 14         1408 0;
221 47         2794 };
222             }
223              
224             #===================================
225             sub sniff {
226             #===================================
227 38     38 1 70 my $self = shift;
228 38         129 $self->logger->infof( 'Sniffing [%s]', $self->stringify );
229             return try {
230             $self->perform_request(
231             { method => 'GET',
232             path => '/_nodes/http',
233             qs => { timeout => $self->sniff_timeout . 's' },
234             timeout => $self->sniff_request_timeout,
235             }
236 38     38   3442 )->{nodes};
237             }
238             catch {
239 13     13   487 $self->logger->debug($_);
240 13         526 return;
241 38         2061 };
242             }
243              
244             #===================================
245             sub build_uri {
246             #===================================
247 5     5 1 14 my ( $self, $params ) = @_;
248 5         34 my $uri = $self->uri->clone;
249 5         43 $uri->path( $uri->path . $params->{path} );
250 5 100       239 my %qs = ( %{ $self->default_qs_params }, %{ $params->{qs} || {} } );
  5         21  
  5         26  
251 5         35 $uri->query_form( \%qs );
252 5         392 return $uri;
253             }
254              
255             #===================================
256             before 'perform_request' => sub {
257             #===================================
258             my ( $self, $params ) = @_;
259             return unless defined $params->{data};
260              
261             $self->_compress_body($params);
262              
263             my $max = $self->max_content_length
264             or return;
265              
266             return if length( $params->{data} ) < $max;
267              
268             $self->logger->throw_error( 'ContentLength',
269             "Body is longer than max_content_length ($max)",
270             );
271             };
272              
273             #===================================
274             sub _compress_body {
275             #===================================
276 0     0   0 my ( $self, $params ) = @_;
277 0         0 my $output;
278 0 0       0 if ( $self->gzip ) {
    0          
279 0 0       0 IO::Compress::Gzip::gzip( \( $params->{data} ), \$output )
280             or throw( 'Request',
281             "Couldn't gzip request: $IO::Compress::Gzip::GzipError" );
282 0         0 $params->{data} = $output;
283 0         0 $params->{encoding} = 'gzip';
284             }
285             elsif ( $self->deflate ) {
286 0 0       0 IO::Compress::Deflate::deflate( \( $params->{data} ), \$output )
287             or throw( 'Request',
288             "Couldn't deflate request: $IO::Compress::Deflate::DeflateError" );
289 0         0 $params->{data} = $output;
290 0         0 $params->{encoding} = 'deflate';
291             }
292             }
293              
294             #===================================
295             sub _decompress_body {
296             #===================================
297 238     238   445 my ( $self, $body_ref, $headers ) = @_;
298 238 50       651 if ( my $encoding = $headers->{'content-encoding'} ) {
299 0         0 my $output;
300 0 0       0 if ( $encoding eq 'gzip' ) {
    0          
301 0 0       0 IO::Uncompress::Gunzip::gunzip( $body_ref, \$output )
302             or throw(
303             'Request',
304             "Couldn't gunzip response: $IO::Uncompress::Gunzip::GunzipError"
305             );
306             }
307             elsif ( $encoding eq 'deflate' ) {
308 0 0       0 IO::Uncompress::Inflate::inflate( $body_ref, \$output,
309             Transparent => 0 )
310             or throw(
311             'Request',
312             "Couldn't inflate response: $IO::Uncompress::Inflate::InflateError"
313             );
314             }
315             else {
316 0         0 throw( 'Request', "Unknown content-encoding: $encoding" );
317             }
318 0         0 ${$body_ref} = $output;
  0         0  
319             }
320             }
321              
322             #===================================
323             sub process_response {
324             #===================================
325 238     238 1 31764 my ( $self, $params, $code, $msg, $body, $headers ) = @_;
326 238         770 $self->_decompress_body( \$body, $headers );
327              
328 238   100     983 my ($mime_type) = split /\s*;\s*/, ( $headers->{'content-type'} || '' );
329              
330 238   100     929 my $is_encoded = $mime_type && $mime_type ne 'text/plain';
331              
332             # Deprecation warnings
333 238 50       538 if ( my $warnings = $headers->{warning} ) {
334 0         0 my $warning_string = _parse_warnings($warnings);
335 0         0 my %temp = (%$params);
336 0         0 delete $temp{data};
337 0         0 $self->logger->deprecation( $warning_string, \%temp );
338             }
339              
340             # Request is successful
341              
342 238 100 66     824 if ( $code >= 200 and $code <= 209 ) {
343 176 100 100     656 if ( defined $body and length $body ) {
344 141 100       733 $body = $self->serializer->decode($body)
345             if $is_encoded;
346 141         1839 return $code, $body;
347             }
348 35 100       133 return ( $code, 1 ) if $params->{method} eq 'HEAD';
349 1         5 return ( $code, '' );
350             }
351              
352             # Check if the error should be ignored
353 62         276 my @ignore = to_list( $params->{ignore} );
354 62 100       216 push @ignore, 404 if $params->{method} eq 'HEAD';
355 62 100       167 return ($code) if grep { $_ eq $code } @ignore;
  12         53  
356              
357             # Determine error type
358 60         154 my $error_type = $Code_To_Error{$code};
359 60 100       154 unless ($error_type) {
360 49 50 33     226 if ( defined $body and length $body ) {
361 0         0 $msg = $body;
362 0         0 $body = undef;
363             }
364 49         172 $error_type = $self->error_from_text( $code, $msg );
365             }
366              
367 60 50       253 delete $params->{data} if $params->{body};
368 60         276 my %error_args = ( status_code => $code, request => $params );
369              
370             # Extract error message from the body, if present
371              
372 60 100       336 if ( $body = $self->serializer->decode($body) ) {
373 2         24 $error_args{body} = $body;
374 2   33     7 $msg = $self->_munge_elasticsearch_exception($body) || $msg;
375              
376 2 50 33     7 $error_args{current_version} = $1
377             if $error_type eq 'Conflict'
378             and $msg =~ /: version conflict, current (?:version )?\[(\d+)\]/;
379             }
380 60   33     162 $msg ||= $error_type;
381              
382 60         142 chomp $msg;
383 60         185 throw( $error_type, "[" . $self->stringify . "]-[$code] $msg",
384             \%error_args );
385             }
386              
387             #===================================
388             sub _parse_warnings {
389             #===================================
390 0 0   0   0 my @warnings = ref $_[0] eq 'ARRAY' ? @{ shift() } : shift();
  0         0  
391 0         0 my @str;
392 0         0 for (@warnings) {
393 0 0       0 if ( $_ =~ /^\d+\s+\S+\s+"((?:\\"|[^"])+)"/ ) {
394 0         0 my $msg = $1;
395 0         0 $msg =~ s/\\"/"/g, push @str, $msg;
396             }
397             else {
398 0         0 push @str, $_;
399             }
400             }
401 0         0 return join "; ", @str;
402             }
403              
404             #===================================
405             sub _munge_elasticsearch_exception {
406             #===================================
407 2     2   5 my ( $self, $body ) = @_;
408 2 50       7 return $body unless ref $body eq 'HASH';
409 2   50     7 my $error = $body->{error} || return;
410 2 50       14 return $error unless ref $error eq 'HASH';
411              
412 0   0       my $root_causes = $error->{root_cause} || [];
413 0 0         unless (@$root_causes) {
414 0 0         my $msg = "[" . $error->{type} . "] " if $error->{type};
415 0 0         $msg .= $error->{reason} if $error->{reason};
416 0           return $msg;
417             }
418              
419 0           my $json = $self->serializer;
420 0           my @msgs;
421 0           for (@$root_causes) {
422 0           my %cause = (%$_);
423             my $msg
424 0           = "[" . ( delete $cause{type} ) . "] " . ( delete $cause{reason} );
425 0 0         if ( keys %cause ) {
426 0           $msg .= ", with: " . $json->encode( \%cause );
427             }
428 0           push @msgs, $msg;
429             }
430 0           return ( join ", ", @msgs );
431             }
432              
433             1;
434              
435             # ABSTRACT: Provides common functionality to HTTP Cxn implementations
436              
437             __END__