File Coverage

blib/lib/Search/Elasticsearch/CxnPool/Async/Sniff.pm
Criterion Covered Total %
statement 77 78 98.7
branch 20 22 90.9
condition 11 11 100.0
subroutine 10 10 100.0
pod 2 2 100.0
total 120 123 97.5


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::CxnPool::Async::Sniff;
19             $Search::Elasticsearch::CxnPool::Async::Sniff::VERSION = '8.00';
20 11     11   651702 use Moo;
  11         30  
  11         71  
21             with 'Search::Elasticsearch::Role::CxnPool::Sniff',
22             'Search::Elasticsearch::Role::Is_Async';
23              
24 11     11   4100 use Scalar::Util qw(weaken);
  11         30  
  11         778  
25 11     11   78 use Promises qw(deferred);
  11         45  
  11         106  
26 11     11   3261 use Search::Elasticsearch::Util qw(new_error);
  11         29  
  11         76  
27              
28 11     11   2843 use namespace::clean;
  11         29  
  11         73  
29             has 'concurrent_sniff' => ( is => 'rw', default => 4 );
30             has '_current_sniff' => ( is => 'rw', clearer => '_clear_sniff' );
31              
32             #===================================
33             sub next_cxn {
34             #===================================
35 79     79 1 610 my ( $self, $no_sniff ) = @_;
36              
37 29     29   2044 return $self->sniff->then( sub { $self->next_cxn('no_sniff') } )
38 79 100 100     565 if $self->next_sniff <= time() && !$no_sniff;
39              
40 50         133 my $cxns = $self->cxns;
41 50         111 my $total = @$cxns;
42 50         89 my $cxn;
43              
44 50         150 while ( 0 < $total-- ) {
45 49         170 $cxn = $cxns->[ $self->next_cxn_num ];
46 49 100       689 last if $cxn->is_live;
47 4         22 undef $cxn;
48             }
49              
50 50         336 my $deferred = deferred;
51              
52 50 100       714 if ($cxn) {
53 45         117 $deferred->resolve($cxn);
54             }
55             else {
56 5         37 $deferred->reject(
57             new_error(
58             "NoNodes",
59             "No nodes are available: [" . $self->cxns_seeds_str . ']'
60             )
61             );
62             }
63 50         2018 return $deferred->promise;
64             }
65              
66             #===================================
67             sub sniff {
68             #===================================
69 29     29 1 82 my $self = shift;
70              
71 29         58 my $promise;
72 29 50       124 if ( $promise = $self->_current_sniff ) {
73 0         0 return $promise;
74             }
75              
76 29         81 my $deferred = deferred;
77 29         396 my $cxns = $self->cxns;
78 29         82 my $total = @$cxns;
79 29         58 my $done = 0;
80 29         56 my $current = 0;
81 29         54 my $done_seeds = 0;
82 29         100 $promise = $self->_current_sniff( $deferred->promise );
83              
84 29         519 my ( @all, @skipped );
85              
86 29         121 while ( 0 < $total-- ) {
87 27         97 my $cxn = $cxns->[ $self->next_cxn_num ];
88 27 100       360 if ( $cxn->is_dead ) {
89 10         68 push @skipped, $cxn;
90             }
91             else {
92 17         130 push @all, $cxn;
93             }
94             }
95              
96 29         69 push @all, @skipped;
97 29 100       110 unless (@all) {
98 13         62 @all = $self->_seeds_as_cxns;
99 13         36577 $done_seeds++;
100             }
101              
102 29         72 my ( $weak_check_sniff, $cxn );
103             my $check_sniff = sub {
104              
105 54 100   54   4824 return if $done;
106 38         96 my ( $cxn, $nodes ) = @_;
107 38 100 100     208 if ( $nodes && $self->parse_sniff($nodes) ) {
108 24         34693 $done++;
109 24         506 $self->_clear_sniff;
110 24         204 return $deferred->resolve();
111             }
112              
113 14 100 100     89 unless ( @all || $done_seeds++ ) {
114 2         48 $self->logger->info("No live nodes available. Trying seed nodes.");
115 2         81 @all = $self->_seeds_as_cxns;
116             }
117              
118 14 100       1201 if ( my $cxn = shift @all ) {
119 9         29 return $cxn->sniff->done($weak_check_sniff);
120             }
121 5 50       18 if ( --$current == 0 ) {
122 5         121 $self->_clear_sniff;
123 5         43 $deferred->resolve();
124             }
125 29         182 };
126 29         126 weaken( $weak_check_sniff = $check_sniff );
127              
128 29         135 for ( 1 .. $self->concurrent_sniff ) {
129 74   100     1967 my $cxn = shift(@all) || last;
130 45         91 $current++;
131 45         181 $cxn->sniff->done($check_sniff);
132             }
133              
134 29         440 return $promise;
135             }
136              
137             #===================================
138             sub _seeds_as_cxns {
139             #===================================
140 15     15   43 my $self = shift;
141 15         61 my $factory = $self->cxn_factory;
142 15         29 return map { $factory->new_cxn($_) } @{ $self->seed_nodes };
  27         120722  
  15         76  
143             }
144              
145             1;
146              
147             # ABSTRACT: An async CxnPool for connecting to a local cluster with a dynamic node list
148              
149             __END__