File Coverage

blib/lib/Search/Elasticsearch/CxnPool/Async/Sniff.pm
Criterion Covered Total %
statement 77 78 98.7
branch 20 22 90.9
condition 11 11 100.0
subroutine 10 10 100.0
pod 2 2 100.0
total 120 123 97.5


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::CxnPool::Async::Sniff;
19             $Search::Elasticsearch::CxnPool::Async::Sniff::VERSION = '7.715';
20 11     11   547423 use Moo;
  11         28  
  11         60  
21             with 'Search::Elasticsearch::Role::CxnPool::Sniff',
22             'Search::Elasticsearch::Role::Is_Async';
23              
24 11     11   3552 use Scalar::Util qw(weaken);
  11         22  
  11         774  
25 11     11   69 use Promises qw(deferred);
  11         21  
  11         102  
26 11     11   2809 use Search::Elasticsearch::Util qw(new_error);
  11         23  
  11         61  
27              
28 11     11   2383 use namespace::clean;
  11         23  
  11         63  
29             has 'concurrent_sniff' => ( is => 'rw', default => 4 );
30             has '_current_sniff' => ( is => 'rw', clearer => '_clear_sniff' );
31              
32             #===================================
33             sub next_cxn {
34             #===================================
35 79     79 1 481 my ( $self, $no_sniff ) = @_;
36              
37 29     29   1705 return $self->sniff->then( sub { $self->next_cxn('no_sniff') } )
38 79 100 100     477 if $self->next_sniff <= time() && !$no_sniff;
39              
40 50         107 my $cxns = $self->cxns;
41 50         89 my $total = @$cxns;
42 50         72 my $cxn;
43              
44 50         124 while ( 0 < $total-- ) {
45 49         146 $cxn = $cxns->[ $self->next_cxn_num ];
46 49 100       587 last if $cxn->is_live;
47 4         19 undef $cxn;
48             }
49              
50 50         269 my $deferred = deferred;
51              
52 50 100       563 if ($cxn) {
53 45         122 $deferred->resolve($cxn);
54             }
55             else {
56 5         20 $deferred->reject(
57             new_error(
58             "NoNodes",
59             "No nodes are available: [" . $self->cxns_seeds_str . ']'
60             )
61             );
62             }
63 50         1691 return $deferred->promise;
64             }
65              
66             #===================================
67             sub sniff {
68             #===================================
69 29     29 1 55 my $self = shift;
70              
71 29         52 my $promise;
72 29 50       108 if ( $promise = $self->_current_sniff ) {
73 0         0 return $promise;
74             }
75              
76 29         71 my $deferred = deferred;
77 29         349 my $cxns = $self->cxns;
78 29         53 my $total = @$cxns;
79 29         44 my $done = 0;
80 29         51 my $current = 0;
81 29         47 my $done_seeds = 0;
82 29         93 $promise = $self->_current_sniff( $deferred->promise );
83              
84 29         430 my ( @all, @skipped );
85              
86 29         92 while ( 0 < $total-- ) {
87 27         71 my $cxn = $cxns->[ $self->next_cxn_num ];
88 27 100       308 if ( $cxn->is_dead ) {
89 10         61 push @skipped, $cxn;
90             }
91             else {
92 17         132 push @all, $cxn;
93             }
94             }
95              
96 29         80 push @all, @skipped;
97 29 100       94 unless (@all) {
98 13         52 @all = $self->_seeds_as_cxns;
99 13         30304 $done_seeds++;
100             }
101              
102 29         64 my ( $weak_check_sniff, $cxn );
103             my $check_sniff = sub {
104              
105 54 100   54   4095 return if $done;
106 38         74 my ( $cxn, $nodes ) = @_;
107 38 100 100     160 if ( $nodes && $self->parse_sniff($nodes) ) {
108 24         27981 $done++;
109 24         393 $self->_clear_sniff;
110 24         163 return $deferred->resolve();
111             }
112              
113 14 100 100     91 unless ( @all || $done_seeds++ ) {
114 2         42 $self->logger->info("No live nodes available. Trying seed nodes.");
115 2         71 @all = $self->_seeds_as_cxns;
116             }
117              
118 14 100       1060 if ( my $cxn = shift @all ) {
119 9         31 return $cxn->sniff->done($weak_check_sniff);
120             }
121 5 50       17 if ( --$current == 0 ) {
122 5         109 $self->_clear_sniff;
123 5         34 $deferred->resolve();
124             }
125 29         142 };
126 29         121 weaken( $weak_check_sniff = $check_sniff );
127              
128 29         140 for ( 1 .. $self->concurrent_sniff ) {
129 74   100     1656 my $cxn = shift(@all) || last;
130 45         74 $current++;
131 45         165 $cxn->sniff->done($check_sniff);
132             }
133              
134 29         346 return $promise;
135             }
136              
137             #===================================
138             sub _seeds_as_cxns {
139             #===================================
140 15     15   33 my $self = shift;
141 15         58 my $factory = $self->cxn_factory;
142 15         27 return map { $factory->new_cxn($_) } @{ $self->seed_nodes };
  27         99724  
  15         74  
143             }
144              
145             1;
146              
147             # ABSTRACT: An async CxnPool for connecting to a local cluster with a dynamic node list
148              
149             __END__