File Coverage

blib/lib/Elasticsearch/Role/Cxn.pm
Criterion Covered Total %
statement 78 81 96.3
branch 21 24 87.5
condition 12 18 66.6
subroutine 19 19 100.0
pod 8 8 100.0
total 138 150 92.0


line stmt bran cond sub pod time code
1             package Elasticsearch::Role::Cxn;
2             $Elasticsearch::Role::Cxn::VERSION = '1.05';
3 42     42   495648 use Moo::Role;
  42         109  
  42         332  
4 42     42   18365 use Elasticsearch::Util qw(throw);
  42         127  
  42         468  
5 42     42   18884 use List::Util qw(min);
  42         97  
  42         7424  
6 42     42   264 use Try::Tiny;
  42         95  
  42         3251  
7 42     42   30058 use URI();
  42         187323  
  42         1248  
8 42     42   378 use Elasticsearch::Util qw(to_list);
  42         90  
  42         388  
9 42     42   13235 use namespace::clean;
  42         97  
  42         455  
10              
11             requires qw(protocol perform_request error_from_text);
12              
13             has 'host' => ( is => 'ro', required => 1 );
14             has 'port' => ( is => 'ro', required => 1 );
15             has 'uri' => ( is => 'ro', required => 1 );
16             has 'request_timeout' => ( is => 'ro', default => 30 );
17             has 'ping_timeout' => ( is => 'ro', default => 2 );
18             has 'sniff_timeout' => ( is => 'ro', default => 1 );
19             has 'sniff_request_timeout' => ( is => 'ro', default => 2 );
20             has 'next_ping' => ( is => 'rw', default => 0 );
21             has 'ping_failures' => ( is => 'rw', default => 0 );
22             has 'dead_timeout' => ( is => 'ro', default => 60 );
23             has 'max_dead_timeout' => ( is => 'ro', default => 3600 );
24             has 'serializer' => ( is => 'ro', required => 1 );
25             has 'logger' => ( is => 'ro', required => 1 );
26             has 'handle_args' => ( is => 'ro', default => sub { {} } );
27              
28             my %Code_To_Error = (
29             400 => 'Request',
30             403 => 'ClusterBlocked',
31             404 => 'Missing',
32             409 => 'Conflict',
33             503 => 'Unavailable'
34             );
35              
36             #===================================
37 182     182 1 1369 sub is_live { !shift->next_ping }
38 99     99 1 2512 sub is_dead { !!shift->next_ping }
39             #===================================
40              
41             #===================================
42             sub mark_live {
43             #===================================
44 149     149 1 1303 my $self = shift;
45 149         418 $self->ping_failures(0);
46 149         487 $self->next_ping(0);
47             }
48              
49             #===================================
50             sub mark_dead {
51             #===================================
52 143     143 1 8391 my $self = shift;
53 143         338 my $fails = $self->ping_failures;
54 143         325 $self->ping_failures( $fails + 1 );
55              
56 143         933 my $timeout
57             = min( $self->dead_timeout * 2**$fails, $self->max_dead_timeout );
58 143         412 my $next = $self->next_ping( time() + $timeout );
59              
60 143         542 $self->logger->infof( 'Marking [%s] as dead. Next ping at: %s',
61             $self->stringify, scalar localtime($next) );
62              
63             }
64              
65             #===================================
66             sub force_ping {
67             #===================================
68 92     92 1 224 my $self = shift;
69 92         351 $self->ping_failures(0);
70 92         1654 $self->next_ping(-1);
71             }
72              
73             #===================================
74             sub pings_ok {
75             #===================================
76 47     47 1 82 my $self = shift;
77 47         226 $self->logger->infof( 'Pinging [%s]', $self->stringify );
78             return try {
79 47     47   4715 $self->perform_request(
80             { method => 'HEAD',
81             path => '/',
82             timeout => $self->ping_timeout,
83             }
84             );
85 33         234 $self->logger->infof( 'Marking [%s] as live', $self->stringify );
86 33         2145 $self->mark_live;
87 33         91 1;
88             }
89             catch {
90 14     14   1047 $self->logger->debug("$_");
91 14         3696 $self->mark_dead;
92 14         2506 0;
93 47         4318 };
94             }
95              
96             #===================================
97             sub sniff {
98             #===================================
99 39     39 1 78 my $self = shift;
100 39         126 my $protocol = $self->protocol;
101 39         192 $self->logger->infof( 'Sniffing [%s]', $self->stringify );
102             return try {
103 39     39   5758 $self->perform_request(
104             { method => 'GET',
105             path => '/_nodes/' . $protocol,
106             qs => { timeout => 1000 * $self->sniff_timeout },
107             timeout => $self->sniff_request_timeout,
108             }
109             )->{nodes};
110             }
111             catch {
112 13     13   306 $self->logger->debug($_);
113 13         2283 return;
114 39         4963 };
115             }
116              
117             #===================================
118             sub process_response {
119             #===================================
120 240     240 1 595 my ( $self, $params, $code, $msg, $body, $mime_type ) = @_;
121              
122 240   100     1193 my $is_encoded = $mime_type && $mime_type ne 'text/plain';
123              
124 240 100 66     1293 if ( $code >= 200 and $code <= 209 ) {
125 177 100 100     975 if ( defined $body and length $body ) {
126 142 100       1094 $body = $self->serializer->decode($body)
127             if $is_encoded;
128 142         4619 return $code, $body;
129             }
130 35 100       289 return ( $code, 1 ) if $params->{method} eq 'HEAD';
131 1         7 return ( $code, '' );
132             }
133              
134 63         340 my @ignore = to_list( $params->{ignore} );
135 63 100       302 push @ignore, 404 if $params->{method} eq 'HEAD';
136 63 100       225 return ($code) if grep { $_ eq $code } @ignore;
  12         65  
137              
138 61         190 my $error_type = $Code_To_Error{$code};
139 61 100       187 unless ($error_type) {
140 49 50       161 if ( defined $body ) {
141 0         0 $msg = $body;
142 0         0 $body = undef;
143             }
144 49         225 $error_type = $self->error_from_text( $code, $msg );
145             }
146              
147 61 50       438 delete $params->{data} if $params->{body};
148 61         340 my %error_args = ( status_code => $code, request => $params );
149              
150 61 100       408 if ( $body = $self->serializer->decode($body) ) {
151 3         43 $error_args{body} = $body;
152 3 50       13 if ( ref $body ) {
153 3   33     20 $msg = $body->{error} || $msg;
154             }
155             else {
156 0         0 $msg = $body;
157             }
158              
159 3 100 66     23 $error_args{current_version} = $1
160             if $error_type eq 'Conflict'
161             and $msg =~ /: version conflict, current \[(\d+)\]/;
162             }
163 61   33     215 $msg ||= $error_type;
164              
165 61         164 chomp $msg;
166 61         327 throw( $error_type, "[" . $self->stringify . "]-[$code] $msg",
167             \%error_args );
168             }
169              
170             1;
171              
172             # ABSTRACT: Provides common functionality to Cxn implementations
173              
174             __END__
175              
176             =pod
177              
178             =encoding UTF-8
179              
180             =head1 NAME
181              
182             Elasticsearch::Role::Cxn - Provides common functionality to Cxn implementations
183              
184             =head1 VERSION
185              
186             version 1.05
187              
188             =head1 DESCRIPTION
189              
190             L<Elasticsearch::Role::Cxn> provides common functionality to the Cxn
191             implementations. Cxn instances are created by a L<Elasticsearch::Role::CxnPool>
192             implementation, using the L<Elasticsearch::Cxn::Factory> class.
193              
194             =head1 CONFIGURATION
195              
196             B<IMPORTANT:> The L</request_timeout>, L</ping_timeout>, L</sniff_timeout>,
197             and L</sniff_request_timeout> parameters default to values that allow
198             this module to function with low powered hardware and slow networks.
199             When you use Elasticsearch in production, you will probably want to reduce
200             these timeout parameters to values that suit your environment.
201              
202             The configuration parameters are as follows:
203              
204             =head2 C<request_timeout>
205              
206             $e = Elasticsearch->new(
207             request_timeout => 30
208             );
209              
210             How long a normal request (ie not a ping or sniff request) should wait
211             before throwing a C<Timeout> error. Defaults to C<30> seconds.
212              
213             B<Note:> In production, no request should take 30 seconds to run, other
214             than an L<optimize()/Elasticsearch::Client::Direct/optimize()> request.
215             A more reasonable value for production would be C<10> seconds or lower.
216              
217             =head2 C<ping_timeout>
218              
219             $e = Elasticsearch->new(
220             ping_timeout => 2
221             );
222              
223             How long a ping request should wait before throwing a C<Timeout> error.
224             Defaults to C<2> seconds. The L<Elasticsearch::CxnPool::Static> module
225             pings nodes on first use, after any failure, and periodically to ensure
226             that nodes are healthy. The C<ping_timeout> should be long enough to allow
227             nodes respond in time, but not so long that sick nodes cause delays.
228             A reasonable value for use in production on reasonable hardware
229             would be C<0.3>-C<1> seconds.
230              
231             =head2 C<dead_timeout>
232              
233             $e = Elasticsearch->new(
234             dead_timeout => 60
235             );
236              
237             How long a Cxn should be considered to be I<dead> (not used to serve requests),
238             before it is retried. The default is C<60> seconds. This value is increased
239             by powers of 2 for each time a request fails. In other words, the delay
240             after each failure is as follows:
241              
242             Failure Delay
243             1 60 * 1 = 60 seconds
244             2 60 * 2 = 120 seconds
245             3 60 * 4 = 240 seconds
246             4 60 * 8 = 480 seconds
247             5 60 * 16 = 960 seconds
248              
249             =head2 C<max_dead_timeout>
250              
251             $e = Elasticsearch->new(
252             max_dead_timeout => 3600
253             );
254              
255             The maximum delay that should be applied to a failed node. If the
256             L</dead_timeout> calculation results in a delay greater than
257             C<max_dead_timeout> (default C<3,600> seconds) then the C<max_dead_timeout>
258             is used instead. In other words, dead nodes will be retried at least once
259             every hour by default.
260              
261             =head2 C<sniff_request_timeout>
262              
263             $e = Elasticsearch->new(
264             sniff_request_timeout => 2
265             );
266              
267             How long a sniff request should wait before throwing a C<Timeout> error.
268             Defaults to C<2> seconds. A reasonable value for production would be
269             C<0.5>-C<2> seconds.
270              
271             =head2 C<sniff_timeout>
272              
273             $e = Elasticsearch->new(
274             sniff_timeout => 1
275             );
276              
277             How long the node being sniffed should wait for responses from other nodes
278             before responding to the client. Defaults to C<1> second. A reasonable
279             value in production would be C<0.3>-C<1> seconds.
280              
281             B<Note:> The C<sniff_timeout> is distinct from the L</sniff_request_timeout>.
282             For example, let's say you have a cluster with 5 nodes, 2 of which are
283             unhealthy (taking a long time to respond):
284              
285             =over
286              
287             =item *
288              
289             If you sniff an unhealthy node, the request will throw a C<Timeout> error
290             after C<sniff_request_timeout> seconds.
291              
292             =item *
293              
294             If you sniff a healthy node, it will gather responses from the other nodes,
295             and give up after C<sniff_timeout> seconds, returning just the information it
296             has managed to gather from the healthy nodes.
297              
298             =back
299              
300             B<NOTE:> The C<sniff_request_timeout> must be longer than the C<sniff_timeout>
301             to ensure that you get information about healthy nodes from the cluster.
302              
303             =head2 C<handle_args>
304              
305             Any default arguments which should be passed when creating a new instance of
306             the class which handles the network transport, eg L<HTTP::Tiny>.
307              
308             =head1 METHODS
309              
310             None of the methods listed below are useful to the user. They are
311             documented for those who are writing alternative implementations only.
312              
313             =head2 C<host()>
314              
315             $host = $cxn->host;
316              
317             The value of the C<host> parameter, eg C<search.domain.com>.
318              
319             =head2 C<port()>
320              
321             $port = $cxn->port;
322              
323             The value of the C<port> parameter, eg C<9200>.
324              
325             =head2 C<uri()>
326              
327             $uri = $cxn->uri;
328              
329             A L<URI> object representing the node, eg C<https://search.domain.com:9200/path>.
330              
331             =head2 C<is_dead()>
332              
333             $bool = $cxn->is_dead
334              
335             Is the current node marked as I<dead>.
336              
337             =head2 C<is_live()>
338              
339             $bool = $cxn->is_live
340              
341             Is the current node marked as I<live>.
342              
343             =head2 C<next_ping()>
344              
345             $time = $cxn->next_ping($time)
346              
347             Get/set the time for the next scheduled ping. If zero, no ping is scheduled
348             and the cxn is considered to be alive. If -1, a ping is scheduled before
349             the next use.
350              
351             =head2 C<ping_failures()>
352              
353             $num = $cxn->ping_failures($num)
354              
355             The number of times that a cxn has been marked as dead.
356              
357             =head2 C<mark_dead()>
358              
359             $cxn->mark_dead
360              
361             Mark the cxn as I<dead>, set L</next_ping()> and increment L</ping_failures()>.
362              
363             =head2 C<mark_live()>
364              
365             Mark the cxn as I<live>, set L</next_ping()> and L</ping_failures()> to zero.
366              
367             =head2 C<force_ping()>
368              
369             Set L</next_ping()> to -1 (ie before next use) and L</ping_failures()> to zero.
370              
371             =head2 C<pings_ok()>
372              
373             $bool = $cxn->pings_ok
374              
375             Try to ping the node and call L</mark_live()> or L</mark_dead()> depending on
376             the success or failure of the ping.
377              
378             =head2 C<sniff()>
379              
380             $response = $cxn->sniff;
381              
382             Send a sniff request to the node and return the response.
383              
384             =head2 C<process_response()>
385              
386             ($code,$result) = $cxn->process_response($params, $code, $msg, $body );
387              
388             Processes the response received from an Elasticsearch node and either
389             returns the HTTP status code and the response body (deserialized from JSON)
390             or throws an error of the appropriate type.
391              
392             The C<$params> are the original params passed to
393             L<Elasticsearch::Transport/perform_request()>, the C<$code> is the HTTP
394             status code, the C<$msg> is the error message returned by the backend
395             library and the C<$body> is the HTTP response body returned by
396             Elasticsearch.
397              
398             =head1 AUTHOR
399              
400             Clinton Gormley <drtech@cpan.org>
401              
402             =head1 COPYRIGHT AND LICENSE
403              
404             This software is Copyright (c) 2014 by Elasticsearch BV.
405              
406             This is free software, licensed under:
407              
408             The Apache License, Version 2.0, January 2004
409              
410             =cut