File Coverage

blib/lib/Search/Elasticsearch/CxnPool/Async/Simple/Sniff.pm
Criterion Covered Total %
statement 12 67 17.9
branch 0 20 0.0
condition n/a
subroutine 4 13 30.7
pod 0 3 0.0
total 16 103 15.5


line stmt bran cond sub pod time code
1             package Search::Elasticsearch::CxnPool::Async::Simple::Sniff;
2              
3 1     1   25257 use Moo;
  1         16828  
  1         7  
4              
5 1     1   2699 use Search::Elasticsearch::Role::Is_Async::Loader ();
  1         3  
  1         22  
6 1     1   849 use Search::Elasticsearch::Util qw(new_error);
  1         35515  
  1         8  
7              
8 1     1   209 use namespace::clean;
  1         2  
  1         9  
9              
10             with 'Search::Elasticsearch::Role::CxnPool::Sniff',
11             'Search::Elasticsearch::Role::Is_Async';
12              
13              
14             sub next_cxn {
15 0     0 0   my ($self, $cb) = @_;
16              
17 0 0         if ($self->next_sniff <= time()) {
18             $self->sniff(sub {
19 0     0     $self->_next_cxn($cb);
20 0           });
21             }
22             else {
23 0           $self->_next_cxn($cb);
24             }
25              
26 0           return;
27             }
28              
29             sub _next_cxn {
30 0     0     my ($self, $cb) = @_;
31              
32 0           my $cxns = $self->cxns;
33 0           my $cnt = @$cxns;
34              
35 0           while (0 < $cnt--) {
36 0           my $cxn = $cxns->[$self->next_cxn_num];
37              
38 0 0         return $cb->($cxn) if $cxn->is_live;
39             }
40              
41 0           local $@ = new_error('NoNodes', 'No nodes are available: [' . $self->cxns_seeds_str . ']');
42              
43 0           $cb->();
44             }
45              
46             sub sniff {
47 0     0 0   my ($self, $cb) = @_;
48              
49 0           my $cxns = $self->cxns;
50 0           my (%seen, @skip);
51              
52 0           my $sub; $sub = sub {
53 0     0     my $cnt = @$cxns;
54 0           my $fnd;
55              
56 0 0         if ($cnt > keys(%seen)) {
57 0           while ($cnt--) {
58 0           my $cxn = $cxns->[$self->next_cxn_num];
59              
60 0 0         next if $seen{$cxn}++;
61              
62 0 0         if ($cxn->is_dead) {
63 0           push(@skip, $cxn);
64             }
65             else {
66             return $self->sniff_cxn($cxn, sub {
67 0 0         return $cb->() if $_[0];
68 0           $cxn->mark_dead();
69 0           $sub->();
70 0           });
71             }
72             }
73             }
74              
75 0 0         if (my $cxn = shift(@skip)) {
76             return $self->sniff_cxn($cxn, sub {
77 0 0         return $cb->() if $_[0];
78 0           $sub->();
79 0           });
80             }
81              
82 0           $self->logger->infof('No live nodes available. Trying seed nodes.');
83 0           $self->_sniff_seed_nodes($cb);
84 0           };
85              
86 0           $sub->();
87              
88 0           return;
89             }
90              
91             sub _sniff_seed_nodes {
92 0     0     my ($self, $cb) = @_;
93              
94 0           my $idx = 0;
95              
96 0           my $sub; $sub = sub {
97 0     0     my $nods = $self->seed_nodes;
98              
99 0 0         return $cb->() if $idx >= @$nods;
100              
101 0           my $cxn = $self->cxn_factory->new_cxn($nods->[$idx++]);
102              
103             $self->sniff_cxn($cxn, sub {
104 0 0         return $cb->() if $_[0];
105 0           $sub->();
106 0           });
107 0           };
108              
109 0           $sub->();
110              
111 0           return;
112             }
113              
114             sub sniff_cxn {
115 0     0 0   my ($self, $cxn, $cb) = @_;
116              
117             $cxn->sniff(sub {
118 0     0     $cb->($self->parse_sniff($cxn->protocol, $_[0]));
119 0           });
120              
121 0           return;
122             }
123              
124              
125             1;
126              
127              
128             __END__