File Coverage

blib/lib/Elasticsearch/Role/CxnPool.pm
Criterion Covered Total %
statement 65 65 100.0
branch 13 14 92.8
condition n/a
subroutine 17 17 100.0
pod 9 9 100.0
total 104 105 99.0


line stmt bran cond sub pod time code
1             package Elasticsearch::Role::CxnPool;
2             $Elasticsearch::Role::CxnPool::VERSION = '1.05';
3 42     42   33431 use Moo::Role;
  42         97  
  42         327  
4 42     42   16744 use Elasticsearch::Util qw(parse_params);
  42         101  
  42         571  
5 42     42   13622 use List::Util qw(shuffle);
  42         100  
  42         3750  
6 42     42   47000 use IO::Select();
  42         94967  
  42         1189  
7 42     42   55758 use Time::HiRes qw(time sleep);
  42         98549  
  42         483  
8 42     42   10729 use Elasticsearch::Util qw(to_list);
  42         108  
  42         555  
9 42     42   14834 use namespace::clean;
  42         102  
  42         433  
10              
11             requires qw(next_cxn schedule_check);
12              
13             has 'cxn_factory' => ( is => 'ro', required => 1 );
14             has 'logger' => ( is => 'ro', required => 1 );
15             has 'serializer' => ( is => 'ro', required => 1 );
16             has 'current_cxn_num' => ( is => 'rwp', default => 0 );
17             has 'cxns' => ( is => 'rwp', default => sub { [] } );
18             has 'seed_nodes' => ( is => 'ro', required => 1 );
19             has 'retries' => ( is => 'rw', default => 0 );
20             has 'randomize_cxns' => ( is => 'ro', default => 1 );
21              
22             #===================================
23             around BUILDARGS => sub {
24             #===================================
25             my $orig = shift;
26             my $params = $orig->(@_);
27             my @seed = grep {$_} to_list( delete $params->{nodes} || ('') );
28              
29             @seed = $params->{cxn_factory}->default_host
30             unless @seed;
31             $params->{seed_nodes} = \@seed;
32             return $params;
33             };
34              
35             #===================================
36             sub next_cxn_num {
37             #===================================
38 200     200 1 317 my $self = shift;
39 200         510 my $cxns = $self->cxns;
40 200 50       661 return unless @$cxns;
41 200         858 my $current = $self->current_cxn_num;
42 200         615 $self->_set_current_cxn_num( ( $current + 1 ) % @$cxns );
43 200         914 return $current;
44             }
45              
46             #===================================
47             sub set_cxns {
48             #===================================
49 83     83 1 208 my $self = shift;
50 83         334 my $factory = $self->cxn_factory;
51 83         222 my @cxns = map { $factory->new_cxn($_) } @_;
  126         8745  
52 83 100       11565 @cxns = shuffle @cxns if $self->randomize_cxns;
53 83         437 $self->_set_cxns( \@cxns );
54 83         360 $self->_set_current_cxn_num(0);
55              
56 126         895 $self->logger->infof( "Current cxns: %s",
57 83         364 [ map { $_->stringify } @cxns ] );
58              
59 83         16274 return;
60             }
61              
62             #===================================
63             sub request_ok {
64             #===================================
65 114     114 1 223 my ( $self, $cxn ) = @_;
66 114         406 $cxn->mark_live;
67 114         1510 $self->reset_retries;
68             }
69              
70             #===================================
71             sub request_failed {
72             #===================================
73 47     47 1 114 my ( $self, $cxn, $error ) = @_;
74              
75 47 100       344 if ( $error->is( 'Cxn', 'Timeout' ) ) {
76 33 100       168 $cxn->mark_dead if $self->should_mark_dead($error);
77 33         4960 $self->schedule_check;
78              
79 33 100       144 if ( $self->should_retry($error) ) {
80 25         129 my $retries = $self->retries( $self->retries + 1 );
81 25 100       190 return 1 if $retries < $self->_max_retries;
82             }
83             }
84             else {
85 14 100       48 $cxn->mark_live if $cxn;
86             }
87 28         259 $self->reset_retries;
88 28         141 return 0;
89             }
90              
91             #===================================
92             sub should_retry {
93             #===================================
94 33     33 1 78 my ( $self, $error ) = @_;
95 33         145 return $error->is('Cxn');
96             }
97              
98             #===================================
99             sub should_mark_dead {
100             #===================================
101 21     21 1 50 my ( $self, $error ) = @_;
102 21         119 return $error->is('Cxn');
103             }
104              
105             #===================================
106             sub cxns_str {
107             #===================================
108 7     7 1 16 my $self = shift;
109 7         15 join ", ", map { $_->stringify } @{ $self->cxns };
  12         64  
  7         31  
110             }
111              
112             #===================================
113             sub cxns_seeds_str {
114             #===================================
115 6     6 1 14 my $self = shift;
116 4         23 join ", ", ( map { $_->stringify } @{ $self->cxns } ),
  6         22  
  6         66  
117 6         15 @{ $self->seed_nodes };
118             }
119              
120             #===================================
121 142     142 1 1871 sub reset_retries { shift->retries(0) }
122 14     14   118 sub _max_retries {2}
123             #===================================
124              
125             1;
126              
127             =pod
128              
129             =encoding UTF-8
130              
131             =head1 NAME
132              
133             Elasticsearch::Role::CxnPool - Provides common functionality to the CxnPool implementations
134              
135             =head1 VERSION
136              
137             version 1.05
138              
139             =head1 DESCRIPTION
140              
141             See the CxnPool implementations:
142              
143             =over
144              
145             =item *
146              
147             L<Elasticsearch::CxnPool::Static>
148              
149             =item *
150              
151             L<Elasticsearch::CxnPool::Sniff>
152              
153             =item *
154              
155             L<Elasticsearch::CxnPool::Static::NoPing>
156              
157             =back
158              
159             =head1 CONFIGURATION
160              
161             These configuration options should not be set by the user but are
162             documented here for completeness.
163              
164             =head2 C<randomize_cxns>
165              
166             By default, the order of cxns passed to L</set_cxns()> is randomized
167             before they are stored. Set C<randomize_cxns> to a false value to
168             disable.
169              
170             =head1 METHODS
171              
172             =head2 C<cxn_factory()>
173              
174             $factory = $cxn_pool->cxn_factory
175              
176             Returns the L<Elasticsearch::Cxn::Factory> object for creating a new
177             C<$cxn> instance.
178              
179             =head2 C<logger()>
180              
181             $logger = $cxn_pool->logger
182              
183             Returns the L<Elasticsearch::Role::Logger>-based object, which
184             defaults to L<Elasticsearch::Logger::LogAny>.
185              
186             =head2 C<serializer()>
187              
188             $serializer = $cxn_pool->serializer
189              
190             Returns the L<Elasticsearch::Role::Serializer>-based object,
191             which defaults to L<Elasticsearch::Serializer::JSON>.
192              
193             =head2 C<current_cxn_num()>
194              
195             $num = $cxn_pool->current_cxn_num
196              
197             Returns the current cxn number, which is an offset into
198             the array of cxns set by L</set_cxns()>.
199              
200             =head2 C<cxns()>
201              
202             \@cxns = $cxn_pool->cxns;
203              
204             Returns the current list of L<Elasticsearch::Role::Cxn>-based
205             cxn objects as set by L</set_cxns()>.
206              
207             =head2 C<seed_nodes()>
208              
209             \@seed_nodes = $cxn_pool->seed_nodes
210              
211             Returns the list of C<nodes> originally specified when calling
212             L<Elasticsearch/new()>.
213              
214             =head2 C<next_cxn_num()>
215              
216             $num = $cxn_pool->next_cxn_num;
217              
218             Returns the number of the next connection, in round-robin fashion. Updates
219             the L</current_cxn_num()>.
220              
221             =head2 C<set_cxns()>
222              
223             $cxn_pool->set_cxns(@nodes);
224              
225             Takes a list of nodes, converts them into L<Elasticsearch::Role::Cxn>-based
226             objects and makes them accessible via L</cxns()>.
227              
228             =head2 C<request_ok()>
229              
230             $cxn_pool->request_ok($cxn);
231              
232             Called when a request by the specified C<$cxn> object has completed successfully.
233             Marks the C<$cxn> as live.
234              
235             =head2 C<request_failed()>
236              
237             $should_retry = $cxn_pool->request_failed($cxn,$error);
238              
239             Called when a request by the specified C<$cxn> object has failed. Returns
240             C<1> if the request should be retried or C<0> if it shouldn't.
241              
242             =head2 C<should_retry()>
243              
244             $bool = $cxn_pool->should_retry($error);
245              
246             Examines the error to decide whether the request should be retried or not.
247             By default, only L<Elasticsearch::Error/Elasticsearch::Error::Cxn> errors
248             are retried.
249              
250             =head2 C<should_mark_dead()>
251              
252             $bool = $cxn_pool->should_mark_dead($error);
253              
254             Examines the error to decide whether the C<$cxn> should be marked as dead or not.
255             By default, only L<Elasticsearch::Error/Elasticsearch::Error::Cxn> errors
256             cause a C<$cxn> to be marked as dead.
257              
258             =head2 C<cxns_str()>
259              
260             $str = $cxn_pool->cxns_str
261              
262             Returns all L</cxns()> as a string for logging purposes.
263              
264             =head2 C<cxns_seeds_str()>
265              
266             $str = $cxn_pool->cxns_seeeds_str
267              
268             Returns all L</cxns()> and L</seed_nodes()> as a string for logging purposes.
269              
270             =head2 C<retries()>
271              
272             $retries = $cxn_pool->retries
273              
274             The number of times the current request has been retried.
275              
276             =head2 C<reset_retries()>
277              
278             $cxn_pool->reset_retries;
279              
280             Called at the start of a new request to reset the retries count.
281              
282             =head1 AUTHOR
283              
284             Clinton Gormley <drtech@cpan.org>
285              
286             =head1 COPYRIGHT AND LICENSE
287              
288             This software is Copyright (c) 2014 by Elasticsearch BV.
289              
290             This is free software, licensed under:
291              
292             The Apache License, Version 2.0, January 2004
293              
294             =cut
295              
296             __END__
297              
298             #ABSTRACT: Provides common functionality to the CxnPool implementations
299              
300