File Coverage

blib/lib/MongoDB/_Dispatcher.pm
Criterion Covered Total %
statement 34 132 25.7
branch 0 54 0.0
condition 0 57 0.0
subroutine 12 21 57.1
pod 0 5 0.0
total 46 269 17.1


line stmt bran cond sub pod time code
1             # Copyright 2018 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 58     58   426 use strict;
  58         145  
  58         1911  
16 58     58   376 use warnings;
  58         138  
  58         2246  
17             package MongoDB::_Dispatcher;
18              
19             # Encapsulate op dispatching; breaking this out from client
20             # allows avoiding circular references with the session pool class.
21              
22 58     58   361 use version;
  58         126  
  58         441  
23             our $VERSION = 'v2.2.0';
24              
25 58     58   4721 use Moo;
  58         142  
  58         451  
26 58     58   21116 use MongoDB::_Constants;
  58         149  
  58         7614  
27 58         514 use MongoDB::_Types qw(
28             Boolish
29 58     58   487 );
  58         177  
30 58     58   63503 use Carp;
  58         153  
  58         3492  
31 58         410 use Types::Standard qw(
32             InstanceOf
33 58     58   392 );
  58         181  
34 58     58   40139 use Safe::Isa;
  58         135  
  58         6711  
35              
36 58     58   425 use namespace::clean;
  58         127  
  58         447  
37              
38             has topology => (
39             is => 'ro',
40             required => 1,
41             isa => InstanceOf ['MongoDB::_Topology'],
42             );
43              
44             has retry_writes => (
45             is => 'ro',
46             required => 1,
47             isa => Boolish,
48             );
49              
50             has retry_reads => (
51             is => 'ro',
52             required => 1,
53             isa => Boolish,
54             );
55              
56             # Reset session state if we're outside an active transaction, otherwise set
57             # that this transaction actually has operations
58             sub _maybe_update_session_state {
59 0     0     my ( $self, $op ) = @_;
60 0 0 0       if ( defined $op->session && ! $op->session->_active_transaction ) {
    0          
61 0           $op->session->_set__transaction_state( TXN_NONE );
62             } elsif ( defined $op->session ) {
63 0           $op->session->_set__has_transaction_operations( 1 );
64             }
65             }
66              
67             # op dispatcher written in highly optimized style
68             sub send_direct_op {
69 0     0 0   my ( $self, $op, $address ) = @_;
70 0           my ( $link, $result );
71              
72 0           $self->_maybe_update_session_state( $op );
73              
74             ( $link = $self->{topology}->get_specific_link( $address, $op ) ), (
75 0   0       eval { ($result) = $op->execute($link); 1 } or do {
76             my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
77             if ( $err->$_isa("MongoDB::ConnectionError") || $err->$_isa("MongoDB::NetworkTimeout") ) {
78             $self->{topology}->mark_server_unknown( $link->server, $err );
79             }
80             elsif ( $err->$_isa("MongoDB::NotMasterError") ) {
81             $self->{topology}->mark_server_unknown( $link->server, $err );
82             $self->{topology}->mark_stale;
83             }
84             # regardless of cleanup, rethrow the error
85             WITH_ASSERTS ? ( confess $err ) : ( die $err );
86             }
87             ),
88             return $result;
89             }
90              
91             sub _retrieve_link_for {
92 0     0     my ( $self, $op, $rw ) = @_;
93 0           my $topology = $self->{'topology'};
94 0           my $link;
95 0 0 0       if ( $op->session
    0 0        
      0        
96             && $op->session->_address # no point trying if theres no address....
97             && $op->session->_active_transaction # this is true during a transaction and on every commit
98             && $topology->_supports_mongos_pinning_transactions )
99             {
100 0           $link = $topology->get_specific_link( $op->session->_address, $op );
101             }
102             elsif ( $rw eq 'w' ) {
103 0           $link = $topology->get_writable_link( $op );
104             } else {
105 0           $link = $topology->get_readable_link( $op );
106             }
107 0           return $link;
108             }
109              
110             # op dispatcher written in highly optimized style
111             sub send_write_op {
112 0     0 0   my ( $self, $op ) = @_;
113 0           my ( $link, $result );
114              
115 0           $self->_maybe_update_session_state( $op );
116              
117             ( $link = $self->_retrieve_link_for( $op, 'w' ) ), (
118 0   0       eval { ($result) = $self->_try_op_for_link( $link, $op ); 1 } or do {
119             my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
120             WITH_ASSERTS ? ( confess $err ) : ( die $err );
121             }
122             ),
123             return $result;
124             }
125              
126             # Sometimes, seeing an op dispatched as "send_write_op" is confusing when
127             # really, we're just insisting that it be sent only to a primary or
128             # directly connected server.
129             BEGIN {
130 58     58   65459 no warnings 'once';
  58         152  
  58         2548  
131 58     58   69530 *send_primary_op = \&send_write_op;
132             }
133              
134             sub send_retryable_write_op {
135 0     0 0   my ( $self, $op, $force ) = @_;
136 0           my ( $link, $result ) = ( $self->_retrieve_link_for( $op, 'w' ) );
137              
138 0           $self->_maybe_update_session_state( $op );
139              
140             # Need to force to do a retryable write on a Transaction Commit or Abort.
141             # $force is an override for retry_writes, but theres no point trying that
142             # if the link doesnt support it anyway.
143             # This triggers on the following:
144             # * $force is not set to 'force'
145             # (specifically for retrying writes in ending transaction operations)
146             # * retry writes is not enabled or the link doesnt support retryWrites
147             # * if an active transaction is starting or in progress
148 0 0 0       unless ( $link->supports_retryWrites
      0        
      0        
      0        
149             && ( $self->retry_writes || ( defined $force && $force eq 'force' ) )
150             && ( defined $op->session
151             && ! $op->session->_in_transaction_state( TXN_STARTING, TXN_IN_PROGRESS )
152             )
153             ) {
154 0 0         eval { ($result) = $self->_try_op_for_link( $link, $op ); 1 } or do {
  0            
  0            
155 0 0         my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
156 0           WITH_ASSERTS ? ( confess $err ) : ( die $err );
157             };
158 0           return $result;
159             }
160              
161             # If we get this far and there is no session, then somethings gone really
162             # wrong, so probably not worth worrying about.
163              
164             # increment transaction id before write, but otherwise is the same for both
165             # attempts. If not in a transaction, is a no-op
166 0           $op->session->_increment_transaction_id;
167 0           $op->retryable_write( 1 );
168              
169             # attempt the op the first time
170 0 0         eval { ($result) = $self->_try_op_for_link( $link, $op ); 1 } or do {
  0            
  0            
171 0 0         my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
172              
173 0 0         if ( $err->$_call_if_can('_is_storage_engine_not_retryable') ) {
174             # Break encapsulation to rewrite the message, then rethrow.
175 0           $err->{message} = "This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.";
176 0           die $err;
177             }
178              
179             # If the error is not retryable, then drop out
180 0 0         unless ( $err->$_call_if_can('_is_retryable') ) {
181 0           WITH_ASSERTS ? ( confess $err ) : ( die $err );
182             }
183              
184             # Must check if error is retryable before getting the link, in case we
185             # get a 'no writable servers' error. In the case of a mongos retry,
186             # this will end up as the same server by design.
187 0           my $retry_link = $self->_retrieve_link_for( $op, 'w' );
188              
189             # Rare chance that the new link is not retryable
190 0 0         unless ( $retry_link->supports_retryWrites ) {
191 0           WITH_ASSERTS ? ( confess $err ) : ( die $err );
192             }
193              
194             # Second attempt
195 0 0         eval { ($result) = $self->_try_op_for_link( $retry_link, $op ); 1 } or do {
  0            
  0            
196 0 0         my $retry_err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
197 0           WITH_ASSERTS ? ( confess $retry_err ) : ( die $retry_err );
198             };
199             };
200             # just in case this gets reused for some reason
201 0           $op->retryable_write( 0 );
202 0           return $result;
203             }
204              
205             sub _is_primary_stepdown {
206 0     0     my ($self, $err, $link) = @_;
207 0           my $err_info = $err->{result}->{output};
208 0           my $err_code_name = '';
209 0 0         $err_code_name = $err_info->{'codeName'} if defined $err_info->{'codeName'};
210 0           my @other_errors = qw(ShutdownInProgress InterruptedAtShutdown);
211 0   0       my $not_master = (
212             $err->$_isa('MongoDB::NotMasterError')
213             || ( $err_info && $err_code_name eq 'NotMaster' )
214             ) && $link->max_wire_version < 8;
215             return (
216 0   0       $err_info && grep { $err_code_name eq $_ } @other_errors
217             ) || $not_master;
218             }
219              
220             # op dispatcher written in highly optimized style
221             sub _try_op_for_link {
222 0     0     my ( $self, $link, $op ) = @_;
223 0           my $result;
224             (
225 0   0       eval { ($result) = $op->execute($link, $self->{topology}->type); 1 } or do {
226             my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
227             if ( $err->$_isa("MongoDB::ConnectionError") || $err->$_isa("MongoDB::NetworkTimeout") ) {
228             $self->{topology}->mark_server_unknown( $link->server, $err );
229             }
230             elsif ( $self->_is_primary_stepdown($err, $link) ) {
231             $self->{topology}->mark_server_unknown( $link->server, $err );
232             $self->{topology}->mark_stale;
233             }
234             # normal die here instead of assert, which is used later
235             die $err;
236             }
237             ),
238             return $result;
239             }
240              
241             sub send_retryable_read_op {
242 0     0 0   my ( $self, $op ) = @_;
243 0           my $result;
244              
245             # Get transaction read preference if in a transaction.
246 0 0 0       if ( defined $op->session && $op->session->_active_transaction ) {
247             # Transactions may only read from primary in MongoDB 4.0, so get and
248             # check the read preference from the transaction settings as per
249             # transaction spec - see MongoDB::_TransactionOptions
250 0           $op->read_preference( $op->session->_get_transaction_read_preference );
251             }
252              
253 0           my $link = $self->_retrieve_link_for( $op, 'r' );
254              
255 0           $self->_maybe_update_session_state( $op );
256              
257 0 0 0       if ( ! $link->supports_retryReads
      0        
      0        
258             || ! $self->retry_reads
259             || ( defined $op->session && $op->session->_in_transaction_state( TXN_STARTING, TXN_IN_PROGRESS ))
260             ) {
261 0 0         eval { ($result) = $self->_try_op_for_link( $link, $op ); 1 } or do {
  0            
  0            
262 0 0         my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
263 0           WITH_ASSERTS ? ( confess $err ) : ( die $err );
264             };
265 0           return $result;
266             }
267              
268 0 0         $op->session->_increment_transaction_id if $op->session;
269              
270 0           $op->retryable_read( 1 );
271             # attempt the op the first time
272 0 0         eval { ($result) = $self->_try_op_for_link( $link, $op ); 1 } or do {
  0            
  0            
273 0 0         my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
274              
275             # If the error is not retryable, then drop out
276 0 0         unless ( $err->$_call_if_can('_is_retryable') ) {
277 0           WITH_ASSERTS ? ( confess $err ) : ( die $err );
278             }
279              
280 0           my $retry_link = $self->_retrieve_link_for( $op, 'r' );
281              
282             # Rare chance that the new link is not retryable
283 0 0         unless ( $retry_link->supports_retryReads ) {
284 0           WITH_ASSERTS ? ( confess $err ) : ( die $err );
285             }
286              
287             # Second attempt
288 0 0         eval { ($result) = $self->_try_op_for_link( $retry_link, $op ); 1 } or do {
  0            
  0            
289 0 0         my $retry_err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
290 0           WITH_ASSERTS ? ( confess $retry_err ) : ( die $retry_err );
291             };
292             };
293             # just in case this gets reused for some reason
294 0           $op->retryable_read( 0 );
295              
296 0           return $result;
297             }
298              
299             # op dispatcher written in highly optimized style
300             sub send_read_op {
301 0     0 0   my ( $self, $op ) = @_;
302 0           my ( $link, $type, $result );
303              
304             # Get transaction read preference if in a transaction.
305 0 0 0       if ( defined $op->session && $op->session->_active_transaction ) {
306             # Transactions may only read from primary in MongoDB 4.0, so get and
307             # check the read preference from the transaction settings as per
308             # transaction spec - see MongoDB::_TransactionOptions
309 0           $op->read_preference( $op->session->_get_transaction_read_preference );
310             }
311              
312 0           $self->_maybe_update_session_state( $op );
313              
314             ( $link = $self->_retrieve_link_for( $op, 'r' ) ),
315             ( $type = $self->{topology}->type ), (
316 0   0       eval { ($result) = $op->execute( $link, $type ); 1 } or do {
317             my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind";
318             if ( $err->$_isa("MongoDB::ConnectionError") || $err->$_isa("MongoDB::NetworkTimeout") ) {
319             $self->{topology}->mark_server_unknown( $link->server, $err );
320             }
321             elsif ( $err->$_isa("MongoDB::NotMasterError") ) {
322             $self->{topology}->mark_server_unknown( $link->server, $err );
323             $self->{topology}->mark_stale;
324             }
325             # regardless of cleanup, rethrow the error
326             WITH_ASSERTS ? ( confess $err ) : ( die $err );
327             }
328             ),
329             return $result;
330             }
331              
332             1;