File Coverage

blib/lib/Search/Elasticsearch/Transport/Async.pm
Criterion Covered Total %
statement 43 43 100.0
branch 6 6 100.0
condition n/a
subroutine 10 10 100.0
pod 1 1 100.0
total 60 60 100.0


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::Transport::Async;
19             $Search::Elasticsearch::Transport::Async::VERSION = '8.00';
20 50     50   939636 use Moo;
  50         147  
  50         312  
21             with 'Search::Elasticsearch::Role::Is_Async',
22             'Search::Elasticsearch::Role::Transport';
23              
24 50     50   17302 use Time::HiRes qw(time);
  50         170  
  50         415  
25 50     50   5818 use Search::Elasticsearch::Util qw(upgrade_error);
  50         171  
  50         408  
26 50     50   15685 use Promises qw(deferred);
  50         145  
  50         357  
27 50     50   12397 use namespace::clean;
  50         131  
  50         421  
28              
29             #===================================
30             sub perform_request {
31             #===================================
32 160     160 1 158489 my $self = shift;
33 160         693 my $params = $self->tidy_request(@_);
34 160         4601 my $pool = $self->cxn_pool;
35 160         465 my $logger = $self->logger;
36              
37 160         547 my $deferred = deferred;
38              
39 160         2461 my ( $start, $cxn );
40             $pool->next_cxn
41              
42             # perform request
43             ->then(
44             sub {
45 148     148   16200 $cxn = shift;
46 148         518 $start = time();
47 148         3786 $cxn->perform_request($params);
48             }
49             )
50              
51             # log request regardless of success/failure
52 160     160   76212 ->finally( sub { $logger->trace_request( $cxn, $params ) } )
53              
54             ->done(
55             # request succeeded
56             sub {
57 114     114   23351 my ( $code, $response ) = @_;
58 114         526 $pool->request_ok($cxn);
59 114         5747 $logger->trace_response( $cxn, $code, $response,
60             time() - $start );
61 114         6260 $deferred->resolve($response);
62             },
63              
64             # request failed
65             sub {
66 46     46   8615 my $error = upgrade_error( shift(), { request => $params } );
67 46 100       660 if ( $pool->request_failed( $cxn, $error ) ) {
68              
69             # log failed, then retry
70 19         4729 $logger->debugf( "[%s] %s", $cxn->stringify, "$error" );
71 19         1538 $logger->info('Retrying request on a new cxn');
72             return $self->perform_request($params)->done(
73 9         487 sub { $deferred->resolve(@_) },
74 10         568 sub { $deferred->reject(@_) }
75 19         624 );
76             }
77 27 100       3361 if ($cxn) {
78 15         105 $logger->trace_request( $cxn, $params );
79 15         830 $logger->trace_error( $cxn, $error );
80             }
81 27 100       779 $error->is('NoNodes')
82             ? $logger->critical($error)
83             : $logger->error($error);
84 27         1735 $deferred->reject($error);
85             }
86 160         1753 );
87 160         7267 return $deferred->promise;
88             }
89              
90             1;
91              
92             #ABSTRACT: Provides async interface between the client class and the Elasticsearch cluster
93              
94             __END__