File Coverage

blib/lib/Search/Elasticsearch/CxnPool/Async/Static.pm
Criterion Covered Total %
statement 40 40 100.0
branch 10 10 100.0
condition 3 3 100.0
subroutine 7 7 100.0
pod 1 1 100.0
total 61 61 100.0


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::CxnPool::Async::Static;
19             $Search::Elasticsearch::CxnPool::Async::Static::VERSION = '8.00';
20 32     32   598574 use Moo;
  32         82  
  32         205  
21             with 'Search::Elasticsearch::Role::CxnPool::Static',
22             'Search::Elasticsearch::Role::Is_Async';
23              
24 32     32   11545 use Search::Elasticsearch::Util qw(new_error);
  32         95  
  32         290  
25 32     32   10273 use Scalar::Util qw(weaken);
  32         89  
  32         1928  
26 32     32   232 use Promises qw(deferred);
  32         84  
  32         262  
27 32     32   7974 use namespace::clean;
  32         81  
  32         199  
28              
29             #===================================
30             sub next_cxn {
31             #===================================
32 54     54 1 129 my ($self) = @_;
33              
34 54         156 my $cxns = $self->cxns;
35 54         110 my $now = time();
36 54         129 my $deferred = deferred;
37              
38 54         613 my ( %seen, @skipped, $weak_find_cxn );
39              
40             my $find_cxn = sub {
41 68     68   1952 my $total = @$cxns;
42 68         164 my $found;
43              
44 68 100       233 if ( $total > keys %seen ) {
45              
46             # we haven't seen all cxns yet
47 61         196 while ( $total-- ) {
48 64         227 my $cxn = $cxns->[ $self->next_cxn_num ];
49 64 100       971 next if $seen{$cxn}++;
50              
51 63 100       241 return $deferred->resolve($cxn)
52             if $cxn->is_live;
53              
54 49 100       402 if ( $cxn->next_ping <= time() ) {
55 40         82 $found = $cxn;
56 40         97 last;
57             }
58              
59 9         32 push @skipped, $cxn;
60             }
61             }
62              
63 54 100 100     242 if ( $found ||= shift @skipped ) {
64             return $found->pings_ok->then(
65 33         4257 sub { $deferred->resolve($found) }, # success
66 47         181 $weak_find_cxn # resolve
67             );
68             }
69              
70 7         36 $_->force_ping for @$cxns;
71              
72 7         103 return $deferred->reject(
73             new_error(
74             "NoNodes", "No nodes are available: [" . $self->cxns_str . ']'
75             )
76             );
77              
78 54         271 };
79 54         252 weaken( $weak_find_cxn = $find_cxn );
80              
81 54         151 $find_cxn->();
82 54         10199 $deferred->promise;
83             }
84              
85             1;
86              
87             # ABSTRACT: An async CxnPool for connecting to a remote cluster with a static list of nodes.
88              
89             __END__