File Coverage

blib/lib/Search/Elasticsearch/Role/CxnPool.pm
Criterion Covered Total %
statement 65 65 100.0
branch 13 14 92.8
condition n/a
subroutine 17 17 100.0
pod 9 9 100.0
total 104 105 99.0


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::Role::CxnPool;
19             $Search::Elasticsearch::Role::CxnPool::VERSION = '8.00';
20 55     55   26830 use Moo::Role;
  55         128  
  55         423  
21 55     55   17566 use Search::Elasticsearch::Util qw(parse_params);
  55         146  
  55         334  
22 55     55   13318 use List::Util qw(shuffle);
  55         116  
  55         3272  
23 55     55   22875 use IO::Select();
  55         84608  
  55         1446  
24 55     55   23835 use Time::HiRes qw(time sleep);
  55         70354  
  55         249  
25 55     55   9620 use Search::Elasticsearch::Util qw(to_list);
  55         125  
  55         348  
26 55     55   13059 use namespace::clean;
  55         121  
  55         303  
27              
28             requires qw(next_cxn schedule_check);
29              
30             has 'cxn_factory' => ( is => 'ro', required => 1 );
31             has 'logger' => ( is => 'ro', required => 1 );
32             has 'serializer' => ( is => 'ro', required => 1 );
33             has 'current_cxn_num' => ( is => 'rwp', default => 0 );
34             has 'cxns' => ( is => 'rwp', default => sub { [] } );
35             has 'seed_nodes' => ( is => 'ro', required => 1 );
36             has 'retries' => ( is => 'rw', default => 0 );
37             has 'randomize_cxns' => ( is => 'ro', default => 1 );
38              
39             #===================================
40             around BUILDARGS => sub {
41             #===================================
42             my $orig = shift;
43             my $params = $orig->(@_);
44             my @seed = grep {$_} to_list( delete $params->{nodes} || ('') );
45              
46             @seed = $params->{cxn_factory}->default_host
47             unless @seed;
48             $params->{seed_nodes} = \@seed;
49             return $params;
50             };
51              
52             #===================================
53             sub next_cxn_num {
54             #===================================
55 200     200 1 349 my $self = shift;
56 200         363 my $cxns = $self->cxns;
57 200 50       440 return unless @$cxns;
58 200         370 my $current = $self->current_cxn_num;
59 200         514 $self->_set_current_cxn_num( ( $current + 1 ) % @$cxns );
60 200         428 return $current;
61             }
62              
63             #===================================
64             sub set_cxns {
65             #===================================
66 112     112 1 211 my $self = shift;
67 112         271 my $factory = $self->cxn_factory;
68 112         242 my @cxns = map { $factory->new_cxn($_) } @_;
  155         32787  
69 112 100       14065 @cxns = shuffle @cxns if $self->randomize_cxns;
70 112         515 $self->_set_cxns( \@cxns );
71 112         322 $self->_set_current_cxn_num(0);
72              
73             $self->logger->infof( "Current cxns: %s",
74 112         344 [ map { $_->stringify } @cxns ] );
  155         695  
75              
76 112         53672 return;
77             }
78              
79             #===================================
80             sub request_ok {
81             #===================================
82 114     114 1 230 my ( $self, $cxn ) = @_;
83 114         331 $cxn->mark_live;
84 114         1028 $self->reset_retries;
85             }
86              
87             #===================================
88             sub request_failed {
89             #===================================
90 46     46 1 133 my ( $self, $cxn, $error ) = @_;
91              
92 46 100       217 if ( $error->is( 'Cxn', 'Timeout' ) ) {
93 33 100       117 $cxn->mark_dead if $self->should_mark_dead($error);
94 33         2401 $self->schedule_check;
95              
96 33 100       97 if ( $self->should_retry($error) ) {
97 25         101 my $retries = $self->retries( $self->retries + 1 );
98 25 100       72 return 1 if $retries < $self->_max_retries;
99             }
100             }
101             else {
102 13 100       58 $cxn->mark_live if $cxn;
103             }
104 27         143 $self->reset_retries;
105 27         91 return 0;
106             }
107              
108             #===================================
109             sub should_retry {
110             #===================================
111 33     33 1 84 my ( $self, $error ) = @_;
112 33         106 return $error->is('Cxn');
113             }
114              
115             #===================================
116             sub should_mark_dead {
117             #===================================
118 21     21 1 52 my ( $self, $error ) = @_;
119 21         55 return $error->is('Cxn');
120             }
121              
122             #===================================
123             sub cxns_str {
124             #===================================
125 7     7 1 15 my $self = shift;
126 7         12 join ", ", map { $_->stringify } @{ $self->cxns };
  12         45  
  7         22  
127             }
128              
129             #===================================
130             sub cxns_seeds_str {
131             #===================================
132 5     5 1 23 my $self = shift;
133 4         20 join ", ", ( map { $_->stringify } @{ $self->cxns } ),
  5         18  
134 5         13 @{ $self->seed_nodes };
  5         48  
135             }
136              
137             #===================================
138 141     141 1 1260 sub reset_retries { shift->retries(0) }
139 14     14   62 sub _max_retries {2}
140             #===================================
141              
142             1;
143              
144             =pod
145              
146             =encoding UTF-8
147              
148             =head1 NAME
149              
150             Search::Elasticsearch::Role::CxnPool - Provides common functionality to the CxnPool implementations
151              
152             =head1 VERSION
153              
154             version 8.00
155              
156             =head1 DESCRIPTION
157              
158             See the CxnPool implementations:
159              
160             =over
161              
162             =item *
163              
164             L
165              
166             =item *
167              
168             L
169              
170             =item *
171              
172             L
173              
174             =back
175              
176             =head1 CONFIGURATION
177              
178             These configuration options should not be set by the user but are
179             documented here for completeness.
180              
181             =head2 C
182              
183             By default, the order of cxns passed to L is randomized
184             before they are stored. Set C to a false value to
185             disable.
186              
187             =head1 METHODS
188              
189             =head2 C
190              
191             $factory = $cxn_pool->cxn_factory
192              
193             Returns the L object for creating a new
194             C<$cxn> instance.
195              
196             =head2 C
197              
198             $logger = $cxn_pool->logger
199              
200             Returns the L-based object, which
201             defaults to L.
202              
203             =head2 C
204              
205             $serializer = $cxn_pool->serializer
206              
207             Returns the L-based object,
208             which defaults to L.
209              
210             =head2 C
211              
212             $num = $cxn_pool->current_cxn_num
213              
214             Returns the current cxn number, which is an offset into
215             the array of cxns set by L.
216              
217             =head2 C
218              
219             \@cxns = $cxn_pool->cxns;
220              
221             Returns the current list of L-based
222             cxn objects as set by L.
223              
224             =head2 C
225              
226             \@seed_nodes = $cxn_pool->seed_nodes
227              
228             Returns the list of C originally specified when calling
229             L.
230              
231             =head2 C
232              
233             $num = $cxn_pool->next_cxn_num;
234              
235             Returns the number of the next connection, in round-robin fashion. Updates
236             the L.
237              
238             =head2 C
239              
240             $cxn_pool->set_cxns(@nodes);
241              
242             Takes a list of nodes, converts them into L-based
243             objects and makes them accessible via L.
244              
245             =head2 C
246              
247             $cxn_pool->request_ok($cxn);
248              
249             Called when a request by the specified C<$cxn> object has completed successfully.
250             Marks the C<$cxn> as live.
251              
252             =head2 C
253              
254             $should_retry = $cxn_pool->request_failed($cxn,$error);
255              
256             Called when a request by the specified C<$cxn> object has failed. Returns
257             C<1> if the request should be retried or C<0> if it shouldn't.
258              
259             =head2 C
260              
261             $bool = $cxn_pool->should_retry($error);
262              
263             Examines the error to decide whether the request should be retried or not.
264             By default, only L errors
265             are retried.
266              
267             =head2 C
268              
269             $bool = $cxn_pool->should_mark_dead($error);
270              
271             Examines the error to decide whether the C<$cxn> should be marked as dead or not.
272             By default, only L errors
273             cause a C<$cxn> to be marked as dead.
274              
275             =head2 C
276              
277             $str = $cxn_pool->cxns_str
278              
279             Returns all L as a string for logging purposes.
280              
281             =head2 C
282              
283             $str = $cxn_pool->cxns_seeeds_str
284              
285             Returns all L and L as a string for logging purposes.
286              
287             =head2 C
288              
289             $retries = $cxn_pool->retries
290              
291             The number of times the current request has been retried.
292              
293             =head2 C
294              
295             $cxn_pool->reset_retries;
296              
297             Called at the start of a new request to reset the retries count.
298              
299             =head1 AUTHOR
300              
301             Enrico Zimuel
302              
303             =head1 COPYRIGHT AND LICENSE
304              
305             This software is Copyright (c) 2022 by Elasticsearch BV.
306              
307             This is free software, licensed under:
308              
309             The Apache License, Version 2.0, January 2004
310              
311             =cut
312              
313             __END__