File Coverage

blib/lib/MongoDB/ChangeStream.pm
Criterion Covered Total %
statement 36 73 49.3
branch 0 26 0.0
condition 0 11 0.0
subroutine 12 16 75.0
pod 2 3 66.6
total 50 129 38.7


line stmt bran cond sub pod time code
1             # Copyright 2018 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 58     58   421 use strict;
  58         135  
  58         1706  
16 58     58   307 use warnings;
  58         141  
  58         1934  
17             package MongoDB::ChangeStream;
18              
19             # ABSTRACT: A stream providing update information for collections.
20              
21 58     58   306 use version;
  58         116  
  58         332  
22             our $VERSION = 'v2.2.0';
23              
24 58     58   4870 use Moo;
  58         166  
  58         374  
25 58     58   20875 use MongoDB::Cursor;
  58         170  
  58         1500  
26 58     58   24948 use MongoDB::Op::_ChangeStream;
  58         227  
  58         2569  
27 58     58   507 use MongoDB::Error;
  58         128  
  58         7124  
28 58     58   424 use Safe::Isa;
  58         176  
  58         7539  
29 58     58   420 use BSON::Timestamp;
  58         135  
  58         1806  
30 58         370 use MongoDB::_Types qw(
31             MongoDBCollection
32             ArrayOfHashRef
33             Boolish
34             BSONTimestamp
35             ClientSession
36 58     58   363 );
  58         135  
37 58         291 use Types::Standard qw(
38             InstanceOf
39             HashRef
40             Maybe
41             Str
42             Num
43 58     58   87290 );
  58         197  
44              
45 58     58   67537 use namespace::clean -except => 'meta';
  58         153  
  58         368  
46              
47             has _result => (
48             is => 'rw',
49             isa => InstanceOf['MongoDB::QueryResult'],
50             init_arg => undef,
51             );
52              
53             has _client => (
54             is => 'ro',
55             isa => InstanceOf['MongoDB::MongoClient'],
56             init_arg => 'client',
57             required => 1,
58             );
59              
60             has _op_args => (
61             is => 'ro',
62             isa => HashRef,
63             init_arg => 'op_args',
64             required => 1,
65             );
66              
67             has _pipeline => (
68             is => 'ro',
69             isa => ArrayOfHashRef,
70             init_arg => 'pipeline',
71             required => 1,
72             );
73              
74             has _full_document => (
75             is => 'ro',
76             isa => Str,
77             init_arg => 'full_document',
78             predicate => '_has_full_document',
79             );
80              
81             has _resume_after => (
82             is => 'ro',
83             init_arg => 'resume_after',
84             predicate => '_has_resume_after',
85             );
86              
87             has _start_after => (
88             is => 'ro',
89             init_arg => 'start_after',
90             predicate => '_has_start_after',
91             );
92              
93             has _all_changes_for_cluster => (
94             is => 'ro',
95             isa => Boolish,
96             init_arg => 'all_changes_for_cluster',
97             default => sub { 0 },
98             );
99              
100             has _start_at_operation_time => (
101             is => 'ro',
102             isa => BSONTimestamp,
103             init_arg => 'start_at_operation_time',
104             predicate => '_has_start_at_operation_time',
105             coerce => sub {
106             ref($_[0]) ? $_[0] : BSON::Timestamp->new(seconds => $_[0])
107             },
108             );
109              
110             has _session => (
111             is => 'ro',
112             isa => Maybe[ClientSession],
113             init_arg => 'session',
114             );
115              
116             has _options => (
117             is => 'ro',
118             isa => HashRef,
119             init_arg => 'options',
120             default => sub { {} },
121             );
122              
123             has _max_await_time_ms => (
124             is => 'ro',
125             isa => Num,
126             init_arg => 'max_await_time_ms',
127             predicate => '_has_max_await_time_ms',
128             );
129              
130             has _last_operation_time => (
131             is => 'rw',
132             init_arg => undef,
133             predicate => '_has_last_operation_time',
134             );
135              
136             has _last_resume_token => (
137             is => 'rw',
138             init_arg => undef,
139             predicate => '_has_last_resume_token',
140             );
141              
142             sub BUILD {
143 0     0 0   my ($self) = @_;
144              
145             # starting point is construction time instead of first next call
146 0           $self->_execute_query;
147             }
148              
149             sub _execute_query {
150 0     0     my ($self) = @_;
151              
152 0           my $resume_opt = {};
153              
154             # seen prior results, continuing after last resume token
155 0 0         if ($self->_has_last_resume_token) {
    0          
    0          
156 0           $resume_opt->{resume_after} = $self->_last_resume_token;
157             }
158             elsif ( $self->_has_start_after ) {
159             $self->_last_resume_token(
160 0           $resume_opt->{start_after} = $self->_start_after
161             );
162             }
163             # no results yet, but we have operation time from prior query
164             elsif ($self->_has_last_operation_time) {
165 0           $resume_opt->{start_at_operation_time} = $self->_last_operation_time;
166             }
167             # no results and no prior operation time, send specified options
168             else {
169 0 0         $resume_opt->{start_at_operation_time} = $self->_start_at_operation_time
170             if $self->_has_start_at_operation_time;
171 0 0         if ( $self->_has_resume_after ) {
172             $self->_last_resume_token(
173 0           $resume_opt->{resume_after} = $self->_resume_after
174             );
175             }
176             }
177              
178             my $op = MongoDB::Op::_ChangeStream->new(
179             pipeline => $self->_pipeline,
180             all_changes_for_cluster => $self->_all_changes_for_cluster,
181             session => $self->_session,
182             options => $self->_options,
183             client => $self->_client,
184             $self->_has_full_document
185             ? (full_document => $self->_full_document)
186             : (),
187             $self->_has_max_await_time_ms
188             ? (maxAwaitTimeMS => $self->_max_await_time_ms)
189             : (),
190             %$resume_opt,
191 0 0         %{ $self->_op_args },
  0 0          
192             );
193              
194 0           my $res = $self->_client->send_retryable_read_op($op);
195 0           $self->_result($res->{result});
196             $self->_last_operation_time($res->{operationTime})
197 0 0         if exists $res->{operationTime};
198             }
199              
200             #pod =head1 STREAM METHODS
201             #pod
202             #pod =cut
203              
204             #pod =head2 next
205             #pod
206             #pod $change_stream = $collection->watch(...);
207             #pod $change = $change_stream->next;
208             #pod
209             #pod Waits for the next change in the collection and returns it.
210             #pod
211             #pod B: This method will wait for the amount of milliseconds passed
212             #pod as C to L or the server's
213             #pod default wait-time. It will not wait indefinitely.
214             #pod
215             #pod =cut
216              
217             sub next {
218 0     0 1   my ($self) = @_;
219              
220 0           my $change;
221             my $retried;
222 0           while (1) {
223             last if eval {
224 0           $change = $self->_result->next;
225 0           1; # successfully fetched result
226 0 0 0       } or do {
227 0   0       my $error = $@ || "Unknown error";
228 0 0 0       if (
      0        
229             not($retried)
230             and $error->$_isa('MongoDB::Error')
231             and $error->_is_resumable
232             ) {
233 0           $retried = 1;
234 0           $self->_execute_query;
235             }
236             else {
237 0           die $error;
238             }
239 0           0; # failed, cursor was rebuilt
240             };
241             }
242              
243             # this differs from drivers that block indefinitely. we have to
244             # deal with the situation where no results are available.
245 0 0         if (not defined $change) {
246 0           return undef; ## no critic
247             }
248              
249 0 0         if (exists $change->{'postBatchResumeToken'}) {
    0          
250 0           $self->_last_resume_token( $change->{'postBatchResumeToken'} );
251 0           return $change;
252             }
253             elsif (exists $change->{_id}) {
254 0           $self->_last_resume_token( $change->{_id} );
255 0           return $change;
256             }
257             else {
258 0           MongoDB::InvalidOperationError->throw(
259             "Cannot provide resume functionality when the ".
260             "resume token is missing");
261             }
262             }
263              
264             #pod =head2 get_resume_token
265             #pod
266             #pod Users can inspect the C<_id> on each C to use as a
267             #pod resume token. But since MongoDB 4.2, C and C responses
268             #pod also include a C. Drivers use one or the other
269             #pod when automatically resuming.
270             #pod
271             #pod This method retrieves the same resume token that would be used to
272             #pod automatically resume. Users intending to store the resume token
273             #pod should use this method to get the most up to date resume token.
274             #pod
275             #pod For instance:
276             #pod
277             #pod if ($local_change) {
278             #pod process_change($local_change);
279             #pod }
280             #pod
281             #pod eval {
282             #pod my $change_stream = $coll->watch([], { resumeAfter => $local_resume_token });
283             #pod while ( my $change = $change_stream->next) {
284             #pod $local_resume_token = $change_stream->get_resume_token;
285             #pod $local_change = $change;
286             #pod process_change($local_change);
287             #pod }
288             #pod };
289             #pod if (my $err = $@) {
290             #pod $log->error($err);
291             #pod }
292             #pod
293             #pod In this case the current change is always persisted locally,
294             #pod including the resume token, such that on restart the application
295             #pod can still process the change while ensuring that the change stream
296             #pod continues from the right logical time in the oplog. It is the
297             #pod application's responsibility to ensure that C is
298             #pod idempotent, this design merely makes a reasonable effort to process
299             #pod each change at least once.
300             #pod
301             #pod =cut
302              
303 0     0 1   sub get_resume_token { $_[0]->_last_resume_token }
304              
305             1;
306              
307             __END__