File Coverage

blib/lib/MongoDB/Op/_ChangeStream.pm
Criterion Covered Total %
statement 30 58 51.7
branch 0 32 0.0
condition 0 14 0.0
subroutine 10 11 90.9
pod 0 1 0.0
total 40 116 34.4


line stmt bran cond sub pod time code
1             # Copyright 2018 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 59     59   396 use strict;
  59         128  
  59         1663  
16 59     59   318 use warnings;
  59         126  
  59         9553  
17             package MongoDB::Op::_ChangeStream;
18              
19             # Encapsulate changestream operation; return MongoDB::QueryResult
20             # and operationTime if supported
21              
22 59     59   406 use version;
  59         137  
  59         307  
23             our $VERSION = 'v2.2.1';
24              
25 59     59   4314 use Moo;
  59         162  
  59         378  
26              
27 59     59   18444 use boolean;
  59         197  
  59         491  
28 59     59   4313 use BSON::Timestamp;
  59         173  
  59         2144  
29 59     59   382 use MongoDB::Op::_Command;
  59         156  
  59         1966  
30 59         502 use MongoDB::_Types qw(
31             ArrayOfHashRef
32             Boolish
33             BSONTimestamp
34 59     59   368 );
  59         149  
35 59         533 use Types::Standard qw(
36             HashRef
37             InstanceOf
38             Num
39             Str
40             Maybe
41 59     59   75607 );
  59         157  
42              
43 59     59   67446 use namespace::clean;
  59         137  
  59         414  
44              
45             has client => (
46             is => 'ro',
47             required => 1,
48             isa => InstanceOf ['MongoDB::MongoClient'],
49             );
50              
51             has pipeline => (
52             is => 'ro',
53             required => 1,
54             isa => ArrayOfHashRef,
55             );
56              
57             has options => (
58             is => 'ro',
59             required => 1,
60             isa => HashRef,
61             );
62              
63             has maxAwaitTimeMS => (
64             is => 'rw',
65             isa => Num,
66             );
67              
68             has full_document => (
69             is => 'ro',
70             isa => Str,
71             predicate => 'has_full_document',
72             );
73              
74             has resume_after => (
75             is => 'ro',
76             predicate => 'has_resume_after',
77             );
78              
79             has start_after => (
80             is => 'ro',
81             predicate => 'has_start_after',
82             );
83              
84             has all_changes_for_cluster => (
85             is => 'ro',
86             isa => Boolish,
87             default => sub { 0 },
88             );
89              
90             has start_at_operation_time => (
91             is => 'ro',
92             isa => BSONTimestamp,
93             predicate => 'has_start_at_operation_time',
94             );
95              
96             with $_ for qw(
97             MongoDB::Role::_PrivateConstructor
98             MongoDB::Role::_CollectionOp
99             MongoDB::Role::_ReadOp
100             MongoDB::Role::_WriteOp
101             MongoDB::Role::_CommandCursorOp
102             );
103              
104             sub execute {
105 0     0 0   my ( $self, $link, $topology ) = @_;
106              
107 0           my $options = $self->options;
108 0           my $is_2_6 = $link->supports_write_commands;
109              
110             # maxTimeMS isn't available until 2.6 and the aggregate command
111             # will reject it as unrecognized
112 0 0         delete $options->{maxTimeMS} unless $is_2_6;
113              
114             # bypassDocumentValidation isn't available until 3.2 (wire version 4) & dont send if false
115 0 0 0       unless ($link->supports_document_validation && $options->{bypassDocumentValidation}) {
116 0           delete $options->{bypassDocumentValidation};
117             }
118              
119 0 0 0       if ( defined $options->{collation} and !$link->supports_collation ) {
120 0           MongoDB::UsageError->throw(
121             "MongoDB host '" . $link->address . "' doesn't support collation" );
122             }
123              
124             # If 'cursor' is explicitly false, we disable using cursors, even
125             # for MongoDB 2.6+. This allows users operating with a 2.6+ mongos
126             # and pre-2.6 mongod in shards to avoid fatal errors. This
127             # workaround should be removed once MongoDB 2.4 is no longer supported.
128             my $use_cursor = $is_2_6
129 0   0       && ( !exists( $options->{cursor} ) || $options->{cursor} );
130              
131             # batchSize is not a command parameter itself like other options
132 0           my $batchSize = delete $options->{batchSize};
133              
134             # If we're doing cursors, we first respect an explicit batchSize option;
135             # next we fallback to the legacy (deprecated) cursor option batchSize; finally we
136             # just give an empty document. Other than batchSize we ignore any other
137             # legacy cursor options. If we're not doing cursors, don't send any
138             # cursor option at all, as servers will choke on it.
139 0 0         if ($use_cursor) {
140 0 0         if ( defined $batchSize ) {
    0          
141 0           $options->{cursor} = { batchSize => $batchSize };
142             }
143             elsif ( ref $options->{cursor} eq 'HASH' ) {
144 0           $batchSize = $options->{cursor}{batchSize};
145 0 0         $options->{cursor} = defined($batchSize) ? { batchSize => $batchSize } : {};
146             }
147             else {
148 0           $options->{cursor} = {};
149             }
150             }
151             else {
152 0           delete $options->{cursor};
153             }
154              
155 0 0 0       if ( $self->coll_name eq 1 && ! $link->supports_db_aggregation ) {
156 0           MongoDB::Error->throw(
157             "Calling aggregate with a collection name of '1' is not supported on Wire Version < 6" );
158             }
159              
160             my @pipeline = (
161             {'$changeStream' => {
162             ($self->has_start_at_operation_time
163             ? (startAtOperationTime => $self->start_at_operation_time)
164             : ()
165             ),
166             ($self->all_changes_for_cluster
167             ? (allChangesForCluster => true)
168             : ()
169             ),
170             ($self->has_full_document
171             ? (fullDocument => $self->full_document)
172             : ()
173             ),
174             ($self->has_resume_after
175             ? (resumeAfter => $self->resume_after)
176             : ()
177             ),
178             ($self->has_start_after
179             ? (startAfter => $self->start_after)
180             : ()
181             ),
182             }},
183 0 0         @{ $self->pipeline },
  0 0          
    0          
    0          
    0          
184             );
185              
186             my @command = (
187             aggregate => $self->coll_name,
188             pipeline => \@pipeline,
189             %$options,
190             $link->supports_read_concern
191 0 0         ? @{ $self->read_concern->as_args( $self->session) }
  0            
192             : (),
193             );
194              
195 0           my $op = MongoDB::Op::_Command->_new(
196             db_name => $self->db_name,
197             query => Tie::IxHash->new(@command),
198             query_flags => {},
199             bson_codec => $self->bson_codec,
200             read_preference => $self->read_preference,
201             session => $self->session,
202             monitoring_callback => $self->monitoring_callback,
203             );
204              
205 0           my $res = $op->execute( $link, $topology );
206              
207             # Fake up a single-batch cursor if we didn't get a cursor response.
208             # We use the 'results' fields as the first (and only) batch
209 0 0         if ( !$res->output->{cursor} ) {
210             $res->output->{cursor} = {
211             ns => '',
212             id => 0,
213 0   0       firstBatch => ( delete $res->output->{result} ) || [],
214             postBatchResumeToken => 0,
215             };
216             }
217              
218             return {
219             result => $self->_build_result_from_cursor($res),
220             $link->supports_4_0_changestreams
221             ? (operationTime => $res->output->{operationTime})
222 0 0         : (),
223             };
224             }
225              
226             1;