File Coverage

blib/lib/Search/Elasticsearch/Transport/Async.pm
Criterion Covered Total %
statement 43 43 100.0
branch 6 6 100.0
condition n/a
subroutine 10 10 100.0
pod 1 1 100.0
total 60 60 100.0


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::Transport::Async;
19             $Search::Elasticsearch::Transport::Async::VERSION = '7.715';
20 50     50   828604 use Moo;
  50         129  
  50         297  
21             with 'Search::Elasticsearch::Role::Is_Async',
22             'Search::Elasticsearch::Role::Transport';
23              
24 50     50   15552 use Time::HiRes qw(time);
  50         119  
  50         447  
25 50     50   5787 use Search::Elasticsearch::Util qw(upgrade_error);
  50         120  
  50         434  
26 50     50   15054 use Promises qw(deferred);
  50         105  
  50         362  
27 50     50   11092 use namespace::clean;
  50         105  
  50         371  
28              
29             #===================================
30             sub perform_request {
31             #===================================
32 160     160 1 134283 my $self = shift;
33 160         611 my $params = $self->tidy_request(@_);
34 160         4087 my $pool = $self->cxn_pool;
35 160         494 my $logger = $self->logger;
36              
37 160         498 my $deferred = deferred;
38              
39 160         2411 my ( $start, $cxn );
40             $pool->next_cxn
41              
42             # perform request
43             ->then(
44             sub {
45 148     148   13072 $cxn = shift;
46 148         451 $start = time();
47 148         3130 $cxn->perform_request($params);
48             }
49             )
50              
51             # log request regardless of success/failure
52 160     160   62592 ->finally( sub { $logger->trace_request( $cxn, $params ) } )
53              
54             ->done(
55             # request succeeded
56             sub {
57 114     114   18869 my ( $code, $response ) = @_;
58 114         458 $pool->request_ok($cxn);
59 114         4659 $logger->trace_response( $cxn, $code, $response,
60             time() - $start );
61 114         5176 $deferred->resolve($response);
62             },
63              
64             # request failed
65             sub {
66 46     46   7078 my $error = upgrade_error( shift(), { request => $params } );
67 46 100       533 if ( $pool->request_failed( $cxn, $error ) ) {
68              
69             # log failed, then retry
70 19         3978 $logger->debugf( "[%s] %s", $cxn->stringify, "$error" );
71 19         1403 $logger->info('Retrying request on a new cxn');
72             return $self->perform_request($params)->done(
73 9         396 sub { $deferred->resolve(@_) },
74 10         445 sub { $deferred->reject(@_) }
75 19         480 );
76             }
77 27 100       3093 if ($cxn) {
78 15         76 $logger->trace_request( $cxn, $params );
79 15         692 $logger->trace_error( $cxn, $error );
80             }
81 27 100       689 $error->is('NoNodes')
82             ? $logger->critical($error)
83             : $logger->error($error);
84 27         1381 $deferred->reject($error);
85             }
86 160         1463 );
87 160         6005 return $deferred->promise;
88             }
89              
90             1;
91              
92             #ABSTRACT: Provides async interface between the client class and the Elasticsearch cluster
93              
94             __END__