File Coverage

blib/lib/MongoDB/_Topology.pm
Criterion Covered Total %
statement 431 641 67.2
branch 157 318 49.3
condition 61 143 42.6
subroutine 68 88 77.2
pod 0 14 0.0
total 717 1204 59.5


line stmt bran cond sub pod time code
1             # Copyright 2014 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 60     60   1051 use strict;
  60         149  
  60         1877  
16 60     60   316 use warnings;
  60         139  
  60         1950  
17             package MongoDB::_Topology;
18              
19 60     60   338 use version;
  60         143  
  60         392  
20             our $VERSION = 'v2.2.1';
21              
22 60     60   4568 use Moo;
  60         153  
  60         384  
23 60     60   21377 use BSON;
  60         178  
  60         2859  
24 60     60   386 use MongoDB::Error;
  60         157  
  60         6235  
25 60     60   440 use MongoDB::Op::_Command;
  60         130  
  60         1719  
26 60     60   24012 use MongoDB::_Platform;
  60         190  
  60         1686  
27 60     60   443 use MongoDB::ReadPreference;
  60         131  
  60         1209  
28 60     60   314 use MongoDB::_Constants;
  60         134  
  60         6477  
29 60     60   27893 use MongoDB::_Link;
  60         242  
  60         3001  
30 60         635 use MongoDB::_Types qw(
31             Boolish
32             BSONCodec
33             CompressionType
34             Document
35             NonNegNum
36             TopologyType
37             ZlibCompressionLevel
38             to_IxHash
39 60     60   577 );
  60         145  
40 60         570 use Types::Standard qw(
41             CodeRef
42             HashRef
43             ArrayRef
44             InstanceOf
45             Num
46             Str
47             Maybe
48 60     60   122010 );
  60         161  
49 60     60   116812 use MongoDB::_Server;
  60         251  
  60         2334  
50 60     60   525 use MongoDB::_Protocol;
  60         149  
  60         1483  
51 60     60   365 use Config;
  60         143  
  60         2648  
52 60     60   363 use List::Util qw/first max min/;
  60         158  
  60         4489  
53 60     60   466 use Safe::Isa;
  60         143  
  60         7451  
54 60     60   439 use Time::HiRes qw/time usleep/;
  60         166  
  60         382  
55              
56 60     60   8125 use namespace::clean;
  60         163  
  60         396  
57              
58             with $_ for qw(
59             MongoDB::Role::_TopologyMonitoring
60             );
61              
62             #--------------------------------------------------------------------------#
63             # attributes
64             #--------------------------------------------------------------------------#
65              
66             has uri => (
67             is => 'ro',
68             required => 1,
69             isa => InstanceOf['MongoDB::_URI'],
70             );
71              
72             has min_server_version => (
73             is => 'ro',
74             required => 1,
75             isa => Str,
76             );
77              
78             has max_wire_version => (
79             is => 'ro',
80             required => 1,
81             isa => Num,
82             );
83              
84             has min_wire_version => (
85             is => 'ro',
86             required => 1,
87             isa => Num,
88             );
89              
90             has credential => (
91             is => 'ro',
92             required => 1,
93             isa => InstanceOf['MongoDB::_Credential'],
94             );
95              
96             # Required so it's passed explicitly, even if undef, to ensure it's wired
97             # up correctly.
98             has monitoring_callback => (
99             is => 'ro',
100             required => 1,
101             isa => Maybe[CodeRef],
102             );
103              
104             has compressors => (
105             is => 'ro',
106             isa => ArrayRef[CompressionType],
107             default => sub { [] },
108             );
109              
110             has zlib_compression_level => (
111             is => 'ro',
112             isa => ZlibCompressionLevel,
113             default => sub { -1 },
114             );
115              
116             has type => (
117             is => 'ro',
118             writer => '_set_type',
119             default => 'Unknown',
120             isa => TopologyType,
121             );
122              
123             has app_name => (
124             is => 'ro',
125             default => '',
126             isa => Str,
127             );
128              
129             has replica_set_name => (
130             is => 'ro',
131             default => '',
132             writer => '_set_replica_set_name', # :-)
133             isa => Str,
134             );
135              
136             has heartbeat_frequency_sec => (
137             is => 'ro',
138             default => 60,
139             isa => NonNegNum,
140             );
141              
142             has last_scan_time => (
143             is => 'ro',
144             default => EPOCH,
145             writer => '_set_last_scan_time',
146             isa => Num,
147             );
148              
149             has local_threshold_sec => (
150             is => 'ro',
151             default => 0.015,
152             isa => Num,
153             );
154              
155             has logical_session_timeout_minutes => (
156             is => 'ro',
157             default => undef,
158             writer => '_set_logical_session_timeout_minutes',
159             isa => Maybe [NonNegNum],
160             );
161              
162             has next_scan_time => (
163             is => 'ro',
164             default => sub { time() },
165             writer => '_set_next_scan_time',
166             isa => Num,
167             );
168              
169             has socket_check_interval_sec => (
170             is => 'ro',
171             default => 5,
172             isa => Num,
173             );
174              
175             has server_selection_timeout_sec => (
176             is => 'ro',
177             default => 60,
178             isa => Num,
179             );
180              
181             has server_selection_try_once => (
182             is => 'ro',
183             default => 1,
184             isa => Boolish,
185             );
186              
187             has server_selector => (
188             is => 'ro',
189             isa => Maybe[CodeRef],
190             );
191              
192             has ewma_alpha => (
193             is => 'ro',
194             default => 0.2,
195             isa => Num,
196             );
197              
198             has link_options => (
199             is => 'ro',
200             default => sub { {} },
201             isa => HashRef,
202             );
203              
204             has bson_codec => (
205             is => 'ro',
206             default => sub { BSON->new },
207             isa => BSONCodec,
208             init_arg => undef,
209             );
210              
211             has number_of_seeds => (
212             is => 'lazy',
213             builder => '_build_number_of_seeds',
214             isa => Num,
215             );
216              
217             has max_election_id => (
218             is => 'rw',
219             writer => '_set_max_election_id',
220             );
221              
222             has max_set_version => (
223             is => 'rw',
224             isa => Maybe [Num],
225             writer => '_set_max_set_version',
226             );
227              
228             # generated only once per _Topology, for performance
229             has handshake_document => (
230             is => 'lazy',
231             isa => Document,
232             builder => '_build_handshake_document',
233             );
234              
235             # compatible wire protocol
236             has is_compatible => (
237             is => 'ro',
238             writer => '_set_is_compatible',
239             isa => Boolish,
240             );
241              
242             has compatibility_error => (
243             is => 'ro',
244             default => '',
245             writer => '_set_compatibility_error',
246             isa => Str,
247             );
248              
249             has wire_version_floor => (
250             is => 'ro',
251             writer => '_set_wire_version_floor',
252             default => 0,
253             );
254              
255             has wire_version_ceil => (
256             is => 'ro',
257             writer => '_set_wire_version_ceil',
258             default => 0,
259             );
260              
261             has current_primary => (
262             is => 'rwp',
263             clearer => '_clear_current_primary',
264             init_arg => undef,
265             );
266              
267             has stale => (
268             is => 'rwp',
269             init_arg => undef,
270             default => 1,
271             );
272              
273             # servers, links and rtt_ewma_sec are all hashes on server address
274              
275             has servers => (
276             is => 'ro',
277             default => sub { {} },
278             isa => HashRef[InstanceOf['MongoDB::_Server']],
279             );
280              
281             has _incompatible_servers => (
282             is => 'rw',
283             default => sub { [] },
284             isa => ArrayRef[InstanceOf['MongoDB::_Server']],
285             );
286              
287             has links => (
288             is => 'ro',
289             default => sub { {} },
290             isa => HashRef[InstanceOf['MongoDB::_Link']],
291             );
292              
293             has rtt_ewma_sec => (
294             is => 'ro',
295             default => sub { {} },
296             isa => HashRef[Num],
297             );
298              
299             has cluster_time => (
300             is => 'rwp',
301             isa => Maybe[Document],
302             init_arg => undef,
303             default => undef,
304             );
305              
306             sub update_cluster_time {
307 0     0 0 0 my ( $self, $cluster_time ) = @_;
308              
309             return unless $cluster_time && exists $cluster_time->{clusterTime}
310 0 0 0     0 && ref($cluster_time->{clusterTime}) eq 'BSON::Timestamp';
      0        
311              
312             # Only update the cluster time if it is more recent than the current entry
313 0 0       0 if ( !defined $self->cluster_time ) {
    0          
314 0         0 $self->_set_cluster_time($cluster_time);
315             }
316             elsif ( $cluster_time->{'clusterTime'} > $self->cluster_time->{'clusterTime'} ) {
317 0         0 $self->_set_cluster_time($cluster_time);
318             }
319 0         0 return;
320             }
321              
322             #--------------------------------------------------------------------------#
323             # builders
324             #--------------------------------------------------------------------------#
325              
326             sub _build_number_of_seeds {
327 2     2   24 my ($self) = @_;
328 2         3 return scalar @{ $self->uri->hostids };
  2         37  
329             }
330              
331             sub _truncate_for_handshake {
332 0     0   0 my $str = shift;
333 0         0 return substr( $str, 0, 64 );
334             }
335              
336             sub _build_handshake_document {
337 0     0   0 my ($self) = @_;
338 0         0 my $driver_version_without_leading_v = substr( $VERSION, 1 );
339              
340 0 0       0 return to_IxHash(
341             [
342             ( length( $self->app_name ) ? ( application => { name => $self->app_name } ) : () ),
343             driver => to_IxHash(
344             [
345             name => "MongoDB Perl Driver",
346             version => $driver_version_without_leading_v,
347             ]
348             ),
349             os => { type => _truncate_for_handshake(MongoDB::_Platform::os_type) },
350             platform => _truncate_for_handshake(MongoDB::_Platform::platform_details)
351             ]
352             );
353             }
354              
355             sub BUILD {
356 331     331 0 9787 my ($self) = @_;
357              
358 331 100       1553 $self->publish_topology_opening
359             if $self->monitoring_callback;
360              
361 331 100       5635 $self->publish_old_topology_desc
362             if $self->monitoring_callback;
363 331         2265 my $type = $self->type;
364 331         563 my @addresses = @{ $self->uri->hostids };
  331         1510  
365              
366             # clone bson codec to disable dt_type
367 331         1664 $self->{bson_codec} = $self->bson_codec->clone( dt_type => undef );
368              
369 331 100       37946 if ( my $set_name = $self->replica_set_name ) {
370 47 100 66     270 if ( $type eq 'Single' || $type eq 'ReplicaSetNoPrimary' ) {
    50          
371             # these are valid, so nothing to do here
372             }
373             elsif ( $type eq 'Unknown' ) {
374 45         923 $self->_set_type('ReplicaSetNoPrimary');
375             }
376             else {
377 0         0 MongoDB::InternalError->throw(
378             "deployment with set name '$set_name' may not be initialized as type '$type'");
379             }
380             }
381              
382 331 50 66     2300 if ( $type eq 'Single' && @addresses > 1 ) {
383 0         0 MongoDB::InternalError->throw(
384             "topology type 'Single' cannot be used with multiple addresses: @addresses");
385             }
386              
387 331         1355 $self->_add_address_as_unknown($_) for @addresses;
388              
389 331 100       11226 $self->publish_new_topology_desc
390             if $self->monitoring_callback;
391              
392 331         10193 return;
393             }
394              
395             sub DEMOLISH {
396 331     331 0 901750 my $self = shift;
397              
398 331 100       35875 $self->publish_topology_closing
399             if $self->monitoring_callback;
400              
401 331         9280 return;
402             }
403              
404             sub _check_for_uri_changes {
405 54     54   215 my ($self) = @_;
406              
407 54         280 my $type = $self->type;
408             return unless
409 54 50 33     553 $type eq 'Sharded'
410             or $type eq 'Unknown';
411              
412 0         0 my @existing = @{ $self->uri->hostids };
  0         0  
413              
414             my $options = {
415             fallback_ttl_sec => $self->{heartbeat_frequency_sec},
416 0         0 };
417              
418 0 0       0 if ($self->uri->check_for_changes($options)) {
419 0         0 my %new = map { ($_, 1) } @{ $self->uri->hostids };
  0         0  
  0         0  
420 0         0 for my $address (@existing) {
421 0 0       0 if (!$new{$address}) {
422 0         0 $self->_remove_address($address);
423             }
424             else {
425 0         0 delete $new{$address};
426             }
427             }
428 0         0 for my $address (keys %new) {
429 0         0 $self->_add_address_as_unknown($address);
430             }
431             }
432             }
433              
434             #--------------------------------------------------------------------------#
435             # public methods
436             #--------------------------------------------------------------------------#
437              
438 4548     4548 0 9012 sub all_servers { return values %{ $_[0]->servers } }
  4548         30202  
439              
440 0     0 0 0 sub all_data_bearing_servers { return grep { $_->is_data_bearing } $_[0]->all_servers }
  0         0  
441              
442             sub check_address {
443 108     108 0 393 my ( $self, $address ) = @_;
444              
445 108         563 my $link = $self->links->{$address};
446 108 50 33     613 if ( $link && $link->is_connected ) {
447 0         0 $self->_update_topology_from_link( $link, with_handshake => 0 );
448             }
449             else {
450             # initialize_link will call update_topology_from_link
451 108         626 $self->_initialize_link($address);
452             }
453              
454 108         312 return;
455             }
456              
457             sub close_all_links {
458 0     0 0 0 my ($self) = @_;
459 0         0 delete $self->links->{ $_->address } for $self->all_servers;
460 0         0 return;
461             }
462              
463             sub _maybe_get_txn_error_labels_and_unpin_from {
464 54     54   615 my $op = shift;
465 54 50 33     1692 return () unless defined $op
466             && defined $op->session;
467 0 0       0 if ( $op->session->_in_transaction_state( TXN_STARTING, TXN_IN_PROGRESS ) ) {
    0          
468 0         0 $op->session->_unpin_address;
469 0         0 return ( error_labels => [ TXN_TRANSIENT_ERROR_MSG ] );
470             } elsif ( $op->session->_in_transaction_state( TXN_COMMITTED ) ) {
471 0         0 return ( error_labels => [ TXN_UNKNOWN_COMMIT_MSG ] );
472             }
473 0         0 return ();
474             }
475              
476             sub get_readable_link {
477 0     0 0 0 my ( $self, $op ) = @_;
478 0         0 $self->_check_for_uri_changes;
479              
480 0 0       0 my $read_pref = defined $op ? $op->read_preference : undef;
481              
482 0 0       0 my $mode = $read_pref ? lc $read_pref->mode : 'primary';
483 0 0       0 my $method =
    0          
484             $self->type eq "Single" ? '_find_available_server'
485             : $self->type eq "Sharded" ? '_find_readable_mongos_server'
486             : "_find_${mode}_server";
487              
488 0 0 0     0 if ( $mode eq 'primary' && $self->current_primary && $self->next_scan_time > time() )
      0        
489             {
490 0         0 my $link = $self->_get_server_link( $self->current_primary, $method );
491 0 0       0 return $link if $link;
492             }
493              
494 0         0 while ( my $server = $self->_selection_timeout( $method, $read_pref ) ) {
495 0         0 my $link = $self->_get_server_link( $server, $method, $read_pref );
496 0 0       0 if ($link) {
497             $self->_set_current_primary($server)
498             if $mode eq 'primary'
499             && ( $self->type eq "ReplicaSetWithPrimary"
500 0 0 0     0 || 1 == keys %{ $self->servers } );
      0        
501 0         0 return $link;
502             }
503             }
504              
505 0 0       0 my $rp = $read_pref ? $read_pref->as_string : 'primary';
506              
507 0         0 MongoDB::SelectionError->throw(
508             message => "No readable server available for matching read preference $rp. MongoDB server status:\n"
509             . $self->_status_string,
510             _maybe_get_txn_error_labels_and_unpin_from( $op ),
511             );
512             }
513              
514             sub get_specific_link {
515 0     0 0 0 my ( $self, $address, $op ) = @_;
516 0         0 $self->_check_for_uri_changes;
517              
518 0         0 my $server = $self->servers->{$address};
519 0 0 0     0 if ( $server && ( my $link = $self->_get_server_link($server) ) ) {
520 0         0 return $link;
521             }
522             else {
523 0         0 MongoDB::SelectionError->throw(
524             message => "Server $address is no longer available",
525             _maybe_get_txn_error_labels_and_unpin_from( $op ),
526             );
527             }
528             }
529              
530             sub get_writable_link {
531 54     54 0 977 my ( $self, $op ) = @_;
532 54         417 $self->_check_for_uri_changes;
533              
534 54 50 33     494 my $method =
535             ( $self->type eq "Single" || $self->type eq "Sharded" )
536             ? '_find_available_server'
537             : "_find_primary_server";
538              
539              
540 54 50 33     418 if ( $self->current_primary && $self->next_scan_time > time() ) {
541 0         0 my $link = $self->_get_server_link( $self->current_primary, $method );
542 0 0       0 return $link if $link;
543             }
544              
545 54         350 while ( my $server = $self->_selection_timeout($method) ) {
546 0         0 my $link = $self->_get_server_link( $server, $method );
547 0 0       0 if ($link) {
548             $self->_set_current_primary($server)
549             if $self->type eq "ReplicaSetWithPrimary"
550 0 0 0     0 || 1 == keys %{ $self->servers };
  0         0  
551 0         0 return $link;
552             }
553             }
554              
555             MongoDB::SelectionError->throw(
556 54         1156 message => "No writable server available. MongoDB server status:\n" . $self->_status_string,
557             _maybe_get_txn_error_labels_and_unpin_from( $op ),
558             );
559             }
560              
561             # Marking a server unknown from outside the topology indicates an operational
562             # error, so the last scan is set to EPOCH so that the next scan won't wait for
563             # the scanning cooldown.
564             sub mark_server_unknown {
565 0     0 0 0 my ( $self, $server, $error, $no_cooldown ) = @_;
566 0   0     0 $self->_reset_address_to_unknown( $server->address, $error, $no_cooldown // EPOCH );
567 0         0 return;
568             }
569              
570             sub mark_stale {
571 0     0 0 0 my ($self) = @_;
572 0         0 $self->_set_stale(1);
573 0         0 return;
574             }
575              
576             sub scan_all_servers {
577 1080     1080 0 12026 my ($self, $force) = @_;
578              
579 1080         4981 my ( $next, @ordinary, @to_check );
580 1080         10433 my $start_time = time;
581 1080 50       10464 my $cooldown_time = $force ? $start_time : $start_time - COOLDOWN_SECS;
582              
583             # anything not updated since scan start is eligible for a check; when all servers
584             # are updated, the loop terminates; Unknown servers aren't checked if
585             # they are in the cooldown window since we don't want to wait the connect
586             # timeout each attempt when they are unlikely to have changed status
587 1080         4677 while (1) {
588             @to_check =
589             grep {
590 1188 50       8973 $_->type eq 'Unknown'
  1188         97496  
591             ? !$_->updated_since($cooldown_time)
592             : !$_->updated_since($start_time)
593             } $self->all_servers;
594              
595 1188 100       9104 last unless @to_check;
596              
597 108 50   108   1475 if ( $next = first { $_->type eq 'RSPrimary' } @to_check ) {
  108 50       2331  
    50          
598 0         0 $self->check_address( $next->address );
599             }
600 108     108   3684 elsif ( $next = first { $_->type eq 'PossiblePrimary' } @to_check ) {
601 0         0 $self->check_address( $next->address );
602             }
603 108 50       2846 elsif ( @ordinary = grep { $_->type ne 'Unknown' && $_->type ne 'RSGhost' } @to_check ) {
604 0         0 $self->_check_oldest_server(@ordinary);
605             }
606             else {
607 108         1952 $self->_check_oldest_server(@to_check);
608             }
609             }
610              
611 1080         5434 my $now = time();
612 1080         29016 $self->_set_last_scan_time( $now );
613 1080         89741 $self->_set_next_scan_time( $now + $self->heartbeat_frequency_sec );
614 1080         49341 $self->_set_stale( 0 );
615 1080         6900 $self->_check_wire_versions;
616 1080         3698 return;
617             }
618              
619             sub status_struct {
620 0     0 0 0 my ($self) = @_;
621 0         0 my $status = { topology_type => $self->type, };
622 0 0       0 $status->{replica_set_name} = $self->replica_set_name if $self->replica_set_name;
623              
624             # convert from [sec, microsec] array to floating point
625 0         0 $status->{last_scan_time} = $self->last_scan_time;
626              
627 0         0 my $rtt_hash = $self->rtt_ewma_sec;
628 0         0 my $ss = $status->{servers} = [];
629 0         0 for my $server ( $self->all_servers ) {
630 0         0 my $addr = $server->address;
631 0         0 my $server_struct = $server->status_struct;
632 0 0       0 if ( defined $rtt_hash->{$addr} ) {
633 0         0 $server_struct->{ewma_rtt_sec} = $rtt_hash->{$addr};
634             }
635 0         0 push @$ss, $server_struct;
636             }
637 0         0 return $status;
638             }
639              
640             #--------------------------------------------------------------------------#
641             # private methods
642             #--------------------------------------------------------------------------#
643              
644             sub _add_address_as_unknown {
645 520     520   2668 my ( $self, $address, $last_update, $error ) = @_;
646 520 100       1469 $error = $error ? "$error" : "";
647 520         1192 $error =~ s/ at \S+ line \d+.*//ms;
648              
649 520 100       1984 $self->publish_server_opening($address)
650             if $self->monitoring_callback;
651              
652 520   100     18011 return $self->servers->{$address} = MongoDB::_Server->new(
653             address => $address,
654             last_update_time => $last_update || EPOCH,
655             error => $error,
656             );
657             }
658              
659             sub _check_for_primary {
660 121     121   253 my ($self) = @_;
661 121 100       264 if ( 0 == $self->_primaries ) {
662 17         438 $self->_set_type('ReplicaSetNoPrimary');
663 17         674 $self->_clear_current_primary;
664 17         97 return 0;
665             }
666 104         1087 return 1;
667             }
668              
669             sub _check_oldest_server {
670 108     108   441 my ( $self, @to_check ) = @_;
671              
672             my @ordered =
673 108         508 map { $_->[0] }
674 0 0       0 sort { $a->[1] <=> $b->[1] || rand() <=> rand() } # random if equal
675 108         412 map { [ $_, $_->last_update_time ] } # ignore partial secs
  108         934  
676             @to_check;
677              
678 108         972 $self->check_address( $ordered[0]->address );
679              
680 108         829 return;
681             }
682              
683             my $max_int32 = 2147483647;
684              
685             sub _check_wire_versions {
686 1235     1235   4604 my ($self) = @_;
687              
688 1235         3581 my $compat = 1;
689 1235         3501 my $min_seen = $max_int32;
690 1235         3531 my $max_seen = 0;
691 1235         5086 for my $server ( grep { $_->is_available } $self->all_servers ) {
  1392         39236  
692             my ( $server_min_wire_version, $server_max_wire_version ) =
693 219         6637 @{ $server->is_master }{qw/minWireVersion maxWireVersion/};
  219         577  
694              
695             # set to 0 as could be undefined. 0 is the equivalent to missing, and
696             # also kept as 0 for legacy compatibility.
697 219 100       493 $server_max_wire_version = 0 unless defined $server_max_wire_version;
698 219 100       418 $server_min_wire_version = 0 unless defined $server_min_wire_version;
699              
700 219 100 66     915 if ( $server_min_wire_version > $self->max_wire_version
701             || $server_max_wire_version < $self->min_wire_version ) {
702 3         8 $compat = 0;
703 3         8 push @{ $self->_incompatible_servers }, $server;
  3         66  
704             }
705              
706 219 100       480 $min_seen = $server_max_wire_version if $server_max_wire_version < $min_seen;
707 219 100       487 $max_seen = $server_max_wire_version if $server_max_wire_version > $max_seen;
708             }
709 1235         47272 $self->_set_is_compatible($compat);
710 1235         45273 $self->_set_wire_version_floor($min_seen);
711 1235         5527 $self->_set_wire_version_ceil($max_seen);
712              
713 1235         3192 return;
714             }
715              
716             sub _update_ls_timeout_minutes {
717 261     261   674 my ( $self, $new_server ) = @_;
718              
719 261         820 my @data_bearing_servers = grep { $_->is_data_bearing } $self->all_servers;
  398         9354  
720             my $timeout = min map {
721             # use -1 as a flag to prevent undefined warnings
722 261 100       6314 defined $_->logical_session_timeout_minutes
  155         3372  
723             ? $_->logical_session_timeout_minutes
724             : -1
725             } @data_bearing_servers;
726             # min will return undef if the array is empty
727 261 100 100     6465 if ( defined $timeout && $timeout < 0 ) {
728 117         194 $timeout = undef;
729             }
730 261         5030 $self->_set_logical_session_timeout_minutes( $timeout );
731 261         14758 return;
732             }
733              
734             sub _supports_sessions {
735 0     0   0 my ( $self ) = @_;
736              
737 0 0       0 $self->scan_all_servers if $self->stale;
738              
739 0         0 my @servers = $self->all_servers;
740 0 0 0     0 return if @servers == 1 && $servers[0]->type eq 'Standalone';
741              
742 0         0 return defined $self->logical_session_timeout_minutes;
743             }
744              
745             sub _supports_transactions {
746 0     0   0 my ( $self ) = @_;
747              
748 0 0       0 return unless $self->_supports_sessions;
749 0 0       0 return $self->_supports_mongos_pinning_transactions if $self->type eq 'Sharded';
750 0 0       0 return if $self->wire_version_ceil < 7;
751 0         0 return 1;
752             }
753              
754             # Seperated out so can be used in dispatch logic
755             sub _supports_mongos_pinning_transactions {
756 0     0   0 my ( $self ) = @_;
757              
758             # Separated out so that it doesnt return 1 for wire version 8 non sharded
759 0 0       0 return if $self->wire_version_ceil < 8;
760             # This extra sharded check is required so this test can be standalone
761 0 0       0 return if $self->type ne 'Sharded';
762 0         0 return 1;
763             }
764              
765             sub _check_staleness_compatibility {
766 76     76   131 my ($self, $read_pref) = @_;
767 76 50       183 my $max_staleness_sec = $read_pref ? $read_pref->max_staleness_seconds : -1;
768              
769 76 100       185 if ( $max_staleness_sec > 0 ) {
770 37 100       106 if ( $self->wire_version_floor < 5 ) {
771 5         51 MongoDB::ProtocolError->throw(
772             "Incompatible wire protocol version. You tried to use max_staleness_seconds with one or more servers that don't support it."
773             );
774             }
775              
776 32 100 100     223 if (
      100        
777             ( $self->type eq "ReplicaSetWithPrimary" || $self->type eq "ReplicaSetNoPrimary" )
778             && $max_staleness_sec < max( SMALLEST_MAX_STALENESS_SEC,
779             $self->heartbeat_frequency_sec + IDLE_WRITE_PERIOD_SEC
780             )
781             )
782             {
783 3         30 MongoDB::UsageError->throw(
784             "max_staleness_seconds must be at least 90 seconds and at least heartbeat_frequency (in secs) + 10 secs"
785             );
786             }
787             }
788              
789 68         118 return;
790             }
791              
792             sub _dump {
793 0     0   0 my ($self) = @_;
794 0         0 print $self->_status_string . "\n";
795             }
796              
797             sub _eligible {
798 42     42   93 my ( $self, $read_pref, @candidates ) = @_;
799              
800             # must filter on max staleness first, so that the remaining servers
801             # are checked against the list of tag_sets
802 42 100       116 if ( $read_pref->max_staleness_seconds > 0 ) {
803 19         50 @candidates = $self->_filter_fresh_servers($read_pref, @candidates );
804 18 100       49 return unless @candidates;
805             };
806              
807             # given a tag set list, if a tag set matches at least one
808             # candidate, then all candidates matching that tag set are eligible
809 40 100       104 if ( ! $read_pref->has_empty_tag_sets ) {
810 27         38 for my $ts ( @{ $read_pref->tag_sets } ) {
  27         64  
811 31 100       50 if ( my @ts_candidates = grep { $_->matches_tag_set($ts) } @candidates ) {
  58         141  
812 18         61 return @ts_candidates;
813             }
814             }
815 9         25 return;
816             }
817              
818 13         37 return @candidates;
819             }
820              
821             sub _filter_fresh_servers {
822 19     19   40 my ($self, $read_pref, @candidates) = @_;
823              
824             # all values should be floating point seconds
825 19         33 my $max_staleness_sec = $read_pref->max_staleness_seconds;
826 19         37 my $heartbeat_frequency_sec = $self->heartbeat_frequency_sec;
827              
828 19 100       49 if ( $self->type eq 'ReplicaSetWithPrimary' ) {
829 10         25 my ($primary) = $self->_primaries;
830              
831             # all values should be floating point seconds
832 10         220 my $p_last_write_date = $primary->last_write_date;
833 10         490 my $p_last_update_time = $primary->last_update_time;
834              
835 16         52 return map { $_->[0] }
836 26         481 grep { $_->[1] <= $max_staleness_sec }
837             map {
838 10         24 [
839 26         939 $_,
840             $p_last_write_date
841             + ( $_->last_update_time - $p_last_update_time )
842             - $_->last_write_date
843             + $heartbeat_frequency_sec
844             ]
845             } @candidates;
846             }
847             else {
848 23         39 my ($smax) = map { $_->[0] }
849 19         394 sort { $b->[1] <=> $a->[1] }
850 9         21 map { [ $_, $_->last_write_date ] } $self->_secondaries;
  23         1144  
851 9         161 my $smax_last_write_date = $smax->last_write_date;
852              
853 15         45 return map { $_->[0] }
854 23         100 grep { $_->[1] <= $max_staleness_sec }
855             map {
856 8         64 [ $_, $smax_last_write_date - $_->last_write_date + $heartbeat_frequency_sec ]
  23         435  
857             } @candidates;
858             }
859             }
860              
861             # This works for reads and writes; for writes, $read_pref will be undef
862             sub _find_available_server {
863 1097     1097   6520 my ( $self, $read_pref, @candidates ) = @_;
864 1097 100       4406 $self->_check_staleness_compatibility($read_pref) if $read_pref;
865 1096 50       5893 push @candidates, $self->all_servers unless @candidates;
866 1096         5157 my $selector = $self->server_selector;
867             return $self->_get_server_in_latency_window(
868 1096 100       6024 [ grep { $_->is_available }
  1151         24536  
869             $selector ? $selector->(@candidates) : @candidates ]
870             );
871             }
872              
873             # This uses read preference to check for max staleness compatibility in
874             # mongos, but otherwise read preference is ignored (mongos will pass it on)
875             sub _find_readable_mongos_server {
876 7     7   53 my ( $self, $read_pref, @candidates ) = @_;
877 7         21 $self->_check_staleness_compatibility($read_pref);
878 6 50       21 push @candidates, $self->all_servers unless @candidates;
879 6         17 my $selector = $self->server_selector;
880             return $self->_get_server_in_latency_window(
881 6 50       18 [ grep { $_->is_available }
  12         432  
882             $selector ? $selector->(@candidates) : @candidates ]
883             );
884             }
885              
886             sub _find_nearest_server {
887 23     23   231 my ( $self, $read_pref, @candidates ) = @_;
888 23         74 $self->_check_staleness_compatibility($read_pref);
889 18 50       71 push @candidates, ( $self->_primaries, $self->_secondaries ) unless @candidates;
890 18         164 my @suitable = $self->_eligible( $read_pref, @candidates );
891 17         50 my $selector = $self->server_selector;
892 17 50       75 return $self->_get_server_in_latency_window(
893             [ $selector ? $selector->(@suitable) : @suitable ]
894             );
895             }
896              
897             sub _find_primary_server {
898 18     18   93 my ( $self, undef, @candidates ) = @_;
899 18 50       55 return $self->current_primary
900             if $self->current_primary;
901 18 50       58 push @candidates, $self->all_servers unless @candidates;
902 18     30   106 return first { $_->is_writable } @candidates;
  30         768  
903             }
904              
905             sub _find_primarypreferred_server {
906 8     8   78 my ( $self, $read_pref, @candidates ) = @_;
907 8         26 $self->_check_staleness_compatibility($read_pref);
908 7   100     22 return $self->_find_primary_server(@candidates)
909             || $self->_find_secondary_server( $read_pref, @candidates );
910             }
911              
912             sub _find_secondary_server {
913 24     24   192 my ( $self, $read_pref, @candidates ) = @_;
914 24         65 $self->_check_staleness_compatibility($read_pref);
915 24 50       79 push @candidates, $self->_secondaries unless @candidates;
916 24         207 my @suitable = $self->_eligible( $read_pref, @candidates );
917 24         57 my $selector = $self->server_selector;
918 24 50       89 return $self->_get_server_in_latency_window(
919             [ $selector ? $selector->(@suitable) : @suitable ]
920             );
921             }
922              
923             sub _find_secondarypreferred_server {
924 11     11   107 my ( $self, $read_pref, @candidates ) = @_;
925 11         36 $self->_check_staleness_compatibility($read_pref);
926 11   100     29 return $self->_find_secondary_server( $read_pref, @candidates )
927             || $self->_find_primary_server(@candidates);
928             }
929              
930             sub _get_server_in_latency_window {
931 1143     1143   16103 my ( $self, $servers ) = @_;
932 1143 100       5768 return unless @$servers;
933 52 100       165 return $servers->[0] if @$servers == 1;
934              
935             # order servers by RTT EWMA
936 32         65 my $rtt_hash = $self->rtt_ewma_sec;
937             my @sorted =
938 83         208 sort { $a->{rtt} <=> $b->{rtt} }
939 32         62 map { { server => $_, rtt => $rtt_hash->{ $_->address } } } @$servers;
  114         365  
940             # lowest RTT is always in the windows
941 32         62 my @in_window = shift @sorted;
942              
943             # add any other servers in window and return a random one
944 32         83 my $max_rtt = $in_window[0]->{rtt} + $self->local_threshold_sec;
945 32         60 push @in_window, grep { $_->{rtt} <= $max_rtt } @sorted;
  82         156  
946 32         221 return $in_window[ int( rand(@in_window) ) ]->{server};
947             }
948              
949             my $PRIMARY = MongoDB::ReadPreference->new;
950             my $PRIMARY_PREF = MongoDB::ReadPreference->new( mode => 'primaryPreferred' );
951              
952             sub _ping_server {
953 0     0   0 my ($self, $link) = @_;
954 0         0 return eval {
955 0         0 my $op = MongoDB::Op::_Command->_new(
956             db_name => 'admin',
957             query => [ping => 1],
958             query_flags => {},
959             bson_codec => $self->bson_codec,
960             read_preference => $PRIMARY_PREF,
961             monitoring_callback => $self->monitoring_callback,
962             );
963 0         0 $op->execute( $link )->output;
964             };
965             }
966              
967              
968             sub _get_server_link {
969 0     0   0 my ( $self, $server, $method, $read_pref ) = @_;
970 0         0 my $address = $server->address;
971 0         0 my $link = $self->links->{$address};
972              
973             # if no link, make a new connection or give up
974 0 0 0     0 $link = $self->_initialize_link($address) unless $link && $link->connected;
975 0 0       0 return unless $link;
976              
977             # for idle links, refresh the server and verify validity
978 0 0       0 if ( time - $link->last_used > $self->socket_check_interval_sec ) {
979 0 0       0 return $link if $self->_ping_server;
980 0         0 $self->mark_server_unknown(
981             $server, 'Lost connection with the server'
982             );
983 0         0 $self->check_address($address);
984              
985             # topology might have dropped the server
986 0 0       0 $server = $self->servers->{$address}
987             or return;
988              
989 0         0 my $fresh_link = $self->links->{$address};
990 0 0       0 return $fresh_link if !$method;
991              
992             # verify selection criteria
993 0 0       0 return $self->$method( $read_pref, $server ) ? $fresh_link : undef;
994             }
995              
996 0         0 return $link;
997             }
998              
999             sub _initialize_link {
1000 108     108   426 my ( $self, $address ) = @_;
1001              
1002             my $link = eval {
1003 108         311 MongoDB::_Link->new( %{$self->link_options}, address => $address )->connect;
  108         3054  
1004 108 50       347 } or do {
1005 108   50     653 my $error = $@ || "Unknown error";
1006             # if connection failed, update topology with Unknown description
1007 108         764 $self->_reset_address_to_unknown( $address, $error );
1008             };
1009              
1010 108 50       1242 return unless $link;
1011              
1012             # connection succeeded, so register link and get a server description
1013 0         0 $self->links->{$address} = $link;
1014 0         0 $self->_update_topology_from_link( $link, with_handshake => 1 );
1015              
1016             # after update, server might or might not exist in the topology;
1017             # if not, return nothing
1018 0 0       0 return unless my $server = $self->servers->{$address};
1019              
1020             # we have a link and the server is a valid member, so
1021             # try to authenticate; if authentication fails, all
1022             # servers are considered invalid and we throw an error
1023 0 0 0 0   0 if ( $self->type eq 'Single' || first { $_ eq $server->type } qw/Standalone Mongos RSPrimary RSSecondary/ ) {
  0         0  
1024             eval {
1025 0         0 $self->credential->authenticate($server, $link, $self->bson_codec);
1026 0         0 1;
1027 0 0       0 } or do {
1028 0         0 my $err = $@;
1029 0 0       0 my $msg = $err->$_isa("MongoDB::Error") ? $err->message : "$err";
1030 0         0 $self->_reset_address_to_unknown( $_->address, $err ) for $self->all_servers;
1031 0         0 MongoDB::AuthError->throw("Authentication to $address failed: $msg");
1032             };
1033             }
1034              
1035 0         0 return $link;
1036             }
1037              
1038             sub _primaries {
1039 203     203   473 return grep { $_->type eq 'RSPrimary' } $_[0]->all_servers;
  424         8648  
1040             }
1041              
1042             sub _remove_address {
1043 225     225   912 my ( $self, $address ) = @_;
1044 225 50 33     1101 if ( $self->current_primary && $self->current_primary->address eq $address ) {
1045 0         0 $self->_clear_current_primary;
1046             }
1047 225         2079 delete $self->$_->{$address} for qw/servers links rtt_ewma_sec/;
1048 225 100       1081 $self->publish_server_closing( $address )
1049             if $self->monitoring_callback;
1050 225         2055 return;
1051             }
1052              
1053             sub _remove_server {
1054 13     13   38 my ( $self, $server ) = @_;
1055 13         57 $self->_remove_address( $server->address );
1056 13         25 return;
1057             }
1058              
1059             sub _reset_address_to_unknown {
1060 118     118   540 my ( $self, $address, $error, $update_time ) = @_;
1061 118   66     1007 $update_time //= time;
1062              
1063 118         629 $self->_remove_address($address);
1064 118         604 my $desc = $self->_add_address_as_unknown( $address, $update_time, $error );
1065 118         4670 $self->_update_topology_from_server_desc($address, $desc);
1066              
1067 118         339 return;
1068             }
1069              
1070             sub _secondaries {
1071 51     51   263 return grep { $_->type eq 'RSSecondary' } $_[0]->all_servers;
  129         2317  
1072             }
1073              
1074             sub _status_string {
1075 54     54   239 my ($self) = @_;
1076 54         289 my $status = '';
1077 54 50       478 if ( $self->type =~ /^Replica/ ) {
1078 0         0 $status .= sprintf( "Topology type: %s; Set name: %s, Member status:\n",
1079             $self->type, $self->replica_set_name );
1080             }
1081             else {
1082 54         957 $status .= sprintf( "Topology type: %s; Member status:\n", $self->type );
1083             }
1084              
1085 54         332 $status .= join( "\n", map { " $_" } map { $_->status_string } $self->all_servers ) . "\n";
  54         490  
  54         393  
1086 54         565 return $status;
1087             }
1088              
1089             # this implements the server selection timeout around whatever actual method
1090             # is used for returning a link
1091             sub _selection_timeout {
1092 54     54   231 my ( $self, $method, $read_pref ) = @_;
1093              
1094 54         265 my $start_time = my $loop_end_time = time();
1095 54         304 my $max_time = $start_time + $self->server_selection_timeout_sec;
1096              
1097 54 50       428 if ( $self->next_scan_time < $start_time ) {
1098 0         0 $self->_set_stale(1);
1099             }
1100              
1101 54         169 while (1) {
1102 1134 100       5807 if ( $self->stale ) {
1103 1080         5126 my $scan_ready_time = $self->last_scan_time + MIN_HEARTBEAT_FREQUENCY_SEC;
1104              
1105             # if not enough time left to wait to check; then caller throws error
1106 1080 100 66     10372 return if !$self->server_selection_try_once && $scan_ready_time > $max_time;
1107              
1108             # loop_end_time is a proxy for time() to avoid overhead
1109 1026         3303 my $sleep_time = $scan_ready_time - $loop_end_time;
1110              
1111 1026 50       512768734 usleep( 1e6 * $sleep_time ) if $sleep_time > 0;
1112              
1113 1026         32093 $self->scan_all_servers;
1114             }
1115              
1116 1080 50       7066 unless ( $self->is_compatible ) {
1117 0         0 $self->_set_stale(1);
1118 0         0 my $error_string = '';
1119 0         0 for my $server ( @{ $self->_incompatible_servers } ) {
  0         0  
1120 0         0 my $min_wire_ver = $server->is_master->{minWireVersion};
1121 0         0 my $max_wire_ver = $server->is_master->{maxWireVersion};
1122 0         0 my $host = $server->address;
1123 0 0       0 if ( $min_wire_ver > $self->max_wire_version ) {
1124 0         0 $error_string .= sprintf(
1125             "Server at %s requires wire version %d, but this version of %s only supports up to %d.\n",
1126             $host,
1127             $min_wire_ver,
1128             'Perl MongoDB Driver',
1129             $self->max_wire_version
1130             );
1131             } else {
1132 0         0 $error_string .= sprintf(
1133             "Server at %s reports wire version %d, but this version of %s requires at least %d (MongoDB %s).\n",
1134             $host,
1135             $max_wire_ver,
1136             'Perl MongoDB Driver',
1137             $self->min_wire_version,
1138             $self->min_server_version,
1139             );
1140             }
1141             }
1142 0         0 $self->_set_compatibility_error($error_string);
1143 0         0 MongoDB::ProtocolError->throw( $error_string );
1144             }
1145              
1146 1080         9386 my $server = $self->$method($read_pref);
1147              
1148 1080 50       4324 return $server if $server;
1149              
1150 1080         3972 $self->_set_stale(1);
1151 1080         4169 $loop_end_time = time();
1152              
1153 1080 50       6023 if ( $self->server_selection_try_once ) {
1154             # if already tried once; then caller throws error
1155 0 0       0 return if $self->last_scan_time > $start_time;
1156             }
1157             else {
1158             # if selection timed out; then caller throws error
1159 1080 50       4817 return if $loop_end_time > $max_time;
1160             }
1161             }
1162             }
1163              
1164             sub _generate_ismaster_request {
1165 0     0   0 my ( $self, $link, $should_perform_handshake ) = @_;
1166 0         0 my @opts;
1167 0 0       0 if ($should_perform_handshake) {
1168 0         0 push @opts, client => $self->handshake_document;
1169 0 0       0 if ( $self->credential->mechanism eq 'DEFAULT' ) {
1170 0         0 my $db_user = join( ".", map { $self->credential->$_ } qw/source username/ );
  0         0  
1171 0         0 push @opts, saslSupportedMechs => $db_user;
1172             }
1173 0 0       0 if (@{ $self->compressors }) {
  0         0  
1174 0         0 push @opts, compression => $self->compressors;
1175             }
1176             }
1177 0 0 0     0 if ( $link->supports_clusterTime && defined $self->cluster_time ) {
1178 0         0 push @opts, '$clusterTime' => $self->cluster_time;
1179             }
1180              
1181 0         0 return [ ismaster => 1, @opts ];
1182             }
1183              
1184             sub _update_topology_from_link {
1185 0     0   0 my ( $self, $link, %opts ) = @_;
1186              
1187 0 0       0 $self->publish_server_heartbeat_started( $link )
1188             if $self->monitoring_callback;
1189              
1190 0         0 my $start_time = time;
1191 0         0 my $is_master = eval {
1192             my $op = MongoDB::Op::_Command->_new(
1193             db_name => 'admin',
1194 0         0 query => $self->_generate_ismaster_request( $link, $opts{with_handshake} ),
1195             query_flags => {},
1196             bson_codec => $self->bson_codec,
1197             read_preference => $PRIMARY,
1198             monitoring_callback => $self->monitoring_callback,
1199             );
1200             # just for this command, use connect timeout as socket timeout;
1201             # this violates encapsulation, but requires less API modification
1202             # to support this specific exception to the socket timeout
1203 0         0 local $link->{socket_timeout} = $link->{connect_timeout};
1204 0         0 $op->execute( $link )->output;
1205             };
1206 0 0       0 if ( my $e = $@ ) {
1207 0         0 my $end_time_fail = time;
1208 0         0 my $rtt_sec_fail = $end_time_fail - $start_time;
1209 0 0       0 $self->publish_server_heartbeat_failed( $link, $rtt_sec_fail, $e )
1210             if $self->monitoring_callback;
1211 0 0 0     0 if ($e->$_isa("MongoDB::DatabaseError") && $e->code == USER_NOT_FOUND ) {
1212 0         0 MongoDB::AuthError->throw("mechanism negotiation error: $e");
1213             }
1214 0         0 warn "During MongoDB topology update for @{[$link->address]}: $e"
1215             if WITH_ASSERTS;
1216 0         0 $self->_reset_address_to_unknown( $link->address, $e );
1217             # retry a network error if server was previously known to us
1218 0 0 0     0 if ( $e->$_isa("MongoDB::NetworkError")
      0        
      0        
1219             and $link->server
1220             and $link->server->type ne 'Unknown'
1221             and $link->server->type ne 'PossiblePrimary' )
1222             {
1223             # the earlier reset to unknown avoids us reaching this branch again
1224             # and recursing forever
1225 0         0 $self->check_address( $link->address );
1226             }
1227 0         0 return;
1228             };
1229              
1230 0 0       0 return unless $is_master;
1231              
1232 0 0       0 if ( my $cluster_time = $is_master->{'$clusterTime'} ) {
1233 0         0 $self->update_cluster_time($cluster_time);
1234             }
1235              
1236 0         0 my $end_time = time;
1237 0         0 my $rtt_sec = $end_time - $start_time;
1238             # Protect against clock skew
1239 0 0       0 $rtt_sec = 0 if $rtt_sec < 0;
1240              
1241 0 0       0 $self->publish_server_heartbeat_succeeded( $link, $rtt_sec, $is_master )
1242             if $self->monitoring_callback;
1243              
1244 0         0 my $new_server = MongoDB::_Server->new(
1245             address => $link->address,
1246             last_update_time => $end_time,
1247             rtt_sec => $rtt_sec,
1248             is_master => $is_master,
1249             compressor => $self->_construct_compressor($is_master),
1250             );
1251              
1252 0         0 $self->_update_topology_from_server_desc( $link->address, $new_server );
1253              
1254 0         0 return;
1255             }
1256              
1257             # find suitable compressor
1258             #
1259             # implemented here because the result is based on the specified
1260             # order of compressors combined with the list of server supported
1261             # compressors
1262             sub _construct_compressor {
1263 0     0   0 my ($self, $is_master) = @_;
1264              
1265 0 0 0     0 my @supported = @{ ($is_master || {})->{compression} || [] }
  0 0       0  
1266             or return undef; ## no critic
1267              
1268 0         0 for my $name (@{ $self->compressors }) {
  0         0  
1269 0 0       0 if (grep { $name eq $_ } @supported) {
  0         0  
1270 0         0 return MongoDB::_Protocol::get_compressor($name, {
1271             zlib_compression_level => $self->zlib_compression_level,
1272             });
1273             }
1274             }
1275              
1276 0         0 return undef; ## no critic
1277             }
1278              
1279             sub _update_topology_from_server_desc {
1280 263     263   5713 my ( $self, $address, $new_server ) = @_;
1281              
1282             # ignore spurious result not in the set; this isn't strictly necessary
1283             # for single-threaded operation, but spec tests expect it and if we
1284             # have async monitoring in the future, late responses could come back
1285             # after a server has been removed
1286 263 100       1275 return unless $self->servers->{$address};
1287              
1288 261 100       1285 $self->publish_old_topology_desc( $address, $new_server )
1289             if $self->monitoring_callback;
1290              
1291 261         3960 $self->_update_ewma( $address, $new_server );
1292              
1293             # must come after ewma update
1294 261         1150 $self->servers->{$address} = $new_server;
1295              
1296 261         1349 my $method = "_update_" . $self->type;
1297              
1298 261         1300 $self->$method( $address, $new_server );
1299              
1300             # if link is still around, tag it with server specifics
1301 261         1006 $self->_update_link_metadata( $address, $new_server );
1302              
1303 261         1017 $self->_update_ls_timeout_minutes( $new_server );
1304              
1305 261 100       1340 $self->publish_new_topology_desc if $self->monitoring_callback;
1306              
1307 261         13963 return $new_server;
1308             }
1309              
1310             sub _update_ewma {
1311 483     483   10448 my ( $self, $address, $new_server ) = @_;
1312              
1313 483 100       7916 if ( $new_server->type eq 'Unknown' ) {
1314 128         3779 delete $self->rtt_ewma_sec->{$address};
1315             }
1316             else {
1317 355         4087 my $old_avg = $self->rtt_ewma_sec->{$address};
1318 355         701 my $alpha = $self->ewma_alpha;
1319 355         678 my $rtt_sec = $new_server->rtt_sec;
1320 355 100       992 $self->rtt_ewma_sec->{$address} =
1321             defined($old_avg) ? ( $alpha * $rtt_sec + ( 1 - $alpha ) * $old_avg ) : $rtt_sec;
1322             }
1323              
1324 483         1122 return;
1325             }
1326              
1327             sub _update_link_metadata {
1328 261     261   656 my ( $self, $address, $server ) = @_;
1329              
1330             # if the link didn't get dropped from the topology during the update, we
1331             # attach the server so the link knows where it came from
1332 261 50       1184 if ( $self->links->{$address} ) {
1333 0         0 $self->links->{$address}->set_metadata($server);
1334             }
1335              
1336 261         534 return;
1337             }
1338              
1339             sub _update_rs_with_primary_from_member {
1340 17     17   44 my ( $self, $new_server ) = @_;
1341              
1342 17 100 66     357 if ( !$self->servers->{ $new_server->address }
1343             || $self->replica_set_name ne $new_server->set_name )
1344             {
1345 2         64 $self->_remove_server($new_server);
1346             }
1347              
1348             # require 'me' that matches expected address.
1349             # check is case insensitive
1350 17 50 66     716 if ( $new_server->me && lc $new_server->me ne $new_server->address ) {
1351 0         0 $self->_remove_server($new_server);
1352 0         0 $self->_check_for_primary;
1353 0         0 return;
1354             }
1355              
1356 17 100       206 if ( ! $self->_check_for_primary ) {
1357              
1358             # flag possible primary to amend scanning order
1359 1         18 my $primary = $new_server->primary;
1360 1 0 33     12 if ( length($primary)
      0        
1361             && $self->servers->{$primary}
1362             && $self->servers->{$primary}->type eq 'Unknown' )
1363             {
1364 0         0 $self->servers->{$primary}->_set_type('PossiblePrimary');
1365             }
1366             }
1367              
1368 17         35 return;
1369             }
1370              
1371             sub _update_rs_with_primary_from_primary {
1372 63     63   137 my ( $self, $new_server ) = @_;
1373              
1374 63 100       1102 if ( !length $self->replica_set_name ) {
    100          
1375 3         57 $self->_set_replica_set_name( $new_server->set_name );
1376             }
1377             elsif ( $self->replica_set_name ne $new_server->set_name ) {
1378             # We found a primary but it doesn't have the setName
1379             # provided by the user or previously discovered
1380 3         83 $self->_remove_server($new_server);
1381 3         7 return;
1382             }
1383              
1384 60         1594 my $election_id = $new_server->is_master->{electionId};
1385 60         128 my $set_version = $new_server->is_master->{setVersion};
1386 60         127 my $max_election_id = $self->max_election_id;
1387 60         994 my $max_set_version = $self->max_set_version;
1388              
1389 60 100 100     473 if ( defined $set_version && defined $election_id ) {
1390 20 100 66     142 if (
      100        
      100        
1391             defined $max_election_id
1392             && defined $max_set_version
1393             && (
1394             $max_set_version > $set_version
1395             || ( $max_set_version == $set_version
1396             && "$max_election_id" gt "$election_id" )
1397             )
1398             )
1399             {
1400             # stale primary
1401              
1402 6         66 $self->_remove_address( $new_server->address );
1403 6         25 $self->_add_address_as_unknown( $new_server->address );
1404 6         193 $self->_check_for_primary;
1405 6         17 return;
1406             }
1407 14         126 $self->_set_max_election_id( $election_id );
1408             }
1409              
1410 54 100 100     228 if ( defined $set_version
      100        
1411             && ( !defined $max_set_version || $set_version > $max_set_version ) )
1412             {
1413 14         240 $self->_set_max_set_version($set_version);
1414             }
1415              
1416             # possibly invalidate an old primary (even if more than one!)
1417 54         540 for my $old_primary ( $self->_primaries ) {
1418 64 100       611 if ( $old_primary->address ne $new_server->address ) {
1419 10         62 $self->_reset_address_to_unknown(
1420             $old_primary->address,
1421             "no longer primary; update needed",
1422             $old_primary->last_update_time
1423             );
1424             }
1425             }
1426              
1427             # unknown set members need to be added to the topology
1428             my %set_members =
1429 54         178 map { $_ => undef } map { @{ $new_server->$_ } } qw/hosts passives arbiters/;
  108         635  
  162         861  
  162         2567  
1430              
1431             $self->_add_address_as_unknown($_)
1432 54         156 for grep { !exists $self->servers->{$_} } keys %set_members;
  108         404  
1433              
1434             # topology servers no longer in the set need to be removed
1435             $self->_remove_address($_)
1436 54         746 for grep { !exists $set_members{$_} } keys %{ $self->servers };
  116         285  
  54         150  
1437              
1438 54         135 return;
1439             }
1440              
1441             sub _update_rs_without_primary {
1442 10     10   29 my ( $self, $new_server ) = @_;
1443              
1444 10 100       171 if ( !length $self->replica_set_name ) {
    100          
1445 1         20 $self->_set_replica_set_name( $new_server->set_name );
1446             }
1447             elsif ( $self->replica_set_name ne $new_server->set_name ) {
1448 2         53 $self->_remove_server($new_server);
1449 2         5 return;
1450             }
1451              
1452             # unknown set members need to be added to the topology
1453             my %set_members =
1454 8         244 map { $_ => undef } map { @{ $new_server->$_ } } qw/hosts passives arbiters/;
  18         113  
  24         132  
  24         431  
1455              
1456             $self->_add_address_as_unknown($_)
1457 8         31 for grep { !exists $self->servers->{$_} } keys %set_members;
  18         71  
1458              
1459             # require 'me' that matches expected address
1460 8 100 66     313 if ( $new_server->me && $new_server->me ne $new_server->address ) {
1461 1         40 $self->_remove_server($new_server);
1462 1         3 return;
1463             }
1464              
1465             # flag possible primary to amend scanning order
1466 7         171 my $primary = $new_server->primary;
1467 7 50 66     109 if ( length($primary)
      33        
1468             && $self->servers->{$primary}
1469             && $self->servers->{$primary}->type eq 'Unknown' )
1470             {
1471 2         72 $self->servers->{$primary}->_set_type('PossiblePrimary');
1472             }
1473              
1474 7         78 return;
1475             }
1476              
1477             #--------------------------------------------------------------------------#
1478             # update methods by topology types: behavior in each depends on new server
1479             # type received
1480             #--------------------------------------------------------------------------#
1481              
1482             sub _update_ReplicaSetNoPrimary {
1483 53     53   119 my ( $self, $address, $new_server ) = @_;
1484              
1485 53         875 my $server_type = $new_server->type;
1486              
1487 53 100       415 if ( $server_type eq 'RSPrimary' ) {
    100          
    100          
1488 41         676 $self->_set_type('ReplicaSetWithPrimary');
1489 41         1061 $self->_update_rs_with_primary_from_primary($new_server);
1490             # topology changes might have removed all primaries
1491 41         115 $self->_check_for_primary;
1492             }
1493 36         94 elsif ( grep { $server_type eq $_ } qw/RSSecondary RSArbiter RSOther/ ) {
1494 9         33 $self->_update_rs_without_primary($new_server);
1495             }
1496 6         20 elsif ( grep { $server_type eq $_ } qw/Standalone Mongos/ ) {
1497 2         12 $self->_remove_server($new_server);
1498             }
1499             else {
1500             # Unknown or RSGhost are no-ops
1501             }
1502              
1503 53         110 return;
1504             }
1505              
1506             sub _update_ReplicaSetWithPrimary {
1507 54     54   140 my ( $self, $address, $new_server ) = @_;
1508              
1509 54         1007 my $server_type = $new_server->type;
1510              
1511 54 100       455 if ( $server_type eq 'RSPrimary' ) {
    100          
    100          
1512 19         52 $self->_update_rs_with_primary_from_primary($new_server);
1513             }
1514 105         260 elsif ( grep { $server_type eq $_ } qw/RSSecondary RSArbiter RSOther/ ) {
1515 17         60 $self->_update_rs_with_primary_from_member($new_server);
1516             }
1517 54         116 elsif ( grep { $server_type eq $_ } qw/Unknown Standalone Mongos/ ) {
1518 16 100       52 $self->_remove_server($new_server)
1519             unless $server_type eq 'Unknown';
1520             }
1521             else {
1522             # RSGhost is no-op
1523             }
1524              
1525             # topology changes might have removed all primaries
1526 54         149 $self->_check_for_primary;
1527              
1528 54         101 return;
1529             }
1530              
1531             sub _update_Sharded {
1532 11     11   25 my ( $self, $address, $new_server ) = @_;
1533              
1534 11         231 my $server_type = $new_server->type;
1535              
1536 11 100       87 if ( grep { $server_type eq $_ } qw/Unknown Mongos/ ) {
  22         66  
1537             # no-op
1538             }
1539             else {
1540 1         7 $self->_remove_server($new_server);
1541             }
1542              
1543 11         23 return;
1544             }
1545              
1546             sub _update_Single {
1547 76     76   295 my ( $self, $address, $new_server ) = @_;
1548             # Per the spec, TopologyType Single never changes type or membership
1549 76         205 return;
1550             }
1551              
1552             # Direct mode is like Unknown, except that it switches only between Sharded
1553             # or Single based on the response.
1554             sub _update_Direct {
1555 54     54   196 my ( $self, $address, $new_server ) = @_;
1556              
1557 54         1023 my $server_type = $new_server->type;
1558              
1559 54 50       626 if ( $server_type eq 'Mongos' ) {
1560 0         0 $self->_set_type('Sharded');
1561 0         0 return;
1562             }
1563              
1564 54         1128 $self->_set_type('Single');
1565 54         1886 return;
1566             }
1567              
1568             sub _update_Unknown {
1569 13     13   26 my ( $self, $address, $new_server ) = @_;
1570              
1571 13         256 my $server_type = $new_server->type;
1572              
1573             # Starting from topology type 'unknown', a standalone server when we
1574             # were given multiple seeds must be a replica set member in maintenance
1575             # mode so we drop it and will rediscover it later.
1576 13 100       117 if ( $server_type eq 'Standalone' ) {
1577 2 50       36 if ( $self->number_of_seeds > 1 ) {
1578 2         63 $self->_remove_address($address);
1579             }
1580             else {
1581 0         0 $self->_set_type('Single');
1582             }
1583 2         6 return;
1584             }
1585              
1586 11 100       30 if ( $server_type eq 'Mongos' ) {
1587 7         112 $self->_set_type('Sharded');
1588 7         170 return;
1589             }
1590              
1591 4 100       18 if ( $server_type eq 'RSPrimary' ) {
    50          
1592 3         61 $self->_set_type('ReplicaSetWithPrimary');
1593 3         97 $self->_update_rs_with_primary_from_primary($new_server);
1594             # topology changes might have removed all primaries
1595 3         13 $self->_check_for_primary;
1596             }
1597 3         10 elsif ( grep { $server_type eq $_ } qw/RSSecondary RSArbiter RSOther/ ) {
1598 1         20 $self->_set_type('ReplicaSetNoPrimary');
1599 1         30 $self->_update_rs_without_primary($new_server);
1600             }
1601             else {
1602             # Unknown or RSGhost are no-ops
1603             }
1604              
1605 4         8 return;
1606             }
1607              
1608             1;
1609              
1610             # vim: ts=4 sts=4 sw=4 et: