File Coverage

blib/lib/MongoDB/Op/_BulkWrite.pm
Criterion Covered Total %
statement 51 189 26.9
branch 0 72 0.0
condition 0 28 0.0
subroutine 17 25 68.0
pod 0 2 0.0
total 68 316 21.5


line stmt bran cond sub pod time code
1             # Copyright 2014 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 58     58   475 use strict;
  58         141  
  58         1876  
16 58     58   332 use warnings;
  58         130  
  58         2491  
17             package MongoDB::Op::_BulkWrite;
18              
19             # Encapsulate a multi-document multi-operation write; returns a
20             # MongoDB::BulkWriteResult object
21              
22 58     58   325 use version;
  58         127  
  58         368  
23             our $VERSION = 'v2.2.0';
24              
25 58     58   4971 use Moo;
  58         137  
  58         422  
26              
27 58     58   19263 use MongoDB::Error;
  58         147  
  58         7398  
28 58     58   25084 use MongoDB::BulkWriteResult;
  58         221  
  58         2249  
29 58     58   516 use MongoDB::UnacknowledgedResult;
  58         177  
  58         1472  
30 58     58   29245 use MongoDB::Op::_InsertOne;
  58         213  
  58         2072  
31 58     58   24511 use MongoDB::Op::_Update;
  58         216  
  58         2727  
32 58     58   24265 use MongoDB::Op::_Delete;
  58         226  
  58         2268  
33 58     58   456 use MongoDB::_Protocol;
  58         154  
  58         1146  
34 58     58   309 use MongoDB::_Constants;
  58         129  
  58         6899  
35 58         390 use MongoDB::_Types qw(
36             Boolish
37 58     58   399 );
  58         145  
38 58         287 use Types::Standard qw(
39             ArrayRef
40             InstanceOf
41 58     58   56970 );
  58         147  
42 58     58   47489 use Safe::Isa;
  58         137  
  58         7793  
43 58     58   420 use boolean;
  58         143  
  58         506  
44              
45 58     58   3621 use namespace::clean;
  58         150  
  58         313  
46              
47             has queue => (
48             is => 'ro',
49             required => 1,
50             isa => ArrayRef,
51             );
52              
53             has ordered => (
54             is => 'ro',
55             required => 1,
56             isa => Boolish,
57             );
58              
59             has client => (
60             is => 'ro',
61             required => 1,
62             isa => InstanceOf['MongoDB::MongoClient'],
63             );
64              
65             has _retryable => (
66             is => 'rw',
67             isa => Boolish,
68             default => 1,
69             );
70              
71             with $_ for qw(
72             MongoDB::Role::_PrivateConstructor
73             MongoDB::Role::_CollectionOp
74             MongoDB::Role::_WriteOp
75             MongoDB::Role::_UpdatePreEncoder
76             MongoDB::Role::_InsertPreEncoder
77             MongoDB::Role::_BypassValidation
78             );
79              
80             sub _is_retryable {
81 0     0     my $self = shift;
82 0   0       return $self->_should_use_acknowledged_write && $self->_retryable;
83             }
84              
85             sub has_collation {
86 0     0 0   my $self = shift;
87             return !!grep {
88 0           my ( $type, $doc ) = @$_;
89 0 0 0       ( $type eq "update" || $type eq "delete" ) && defined $doc->{collation};
90 0           } @{ $self->queue };
  0            
91             }
92              
93             sub execute {
94 0     0 0   my ( $self, $link ) = @_;
95              
96 0 0         Carp::confess("NO LINK") unless $link;
97              
98 0 0         if ( $self->has_collation ) {
99 0 0         MongoDB::UsageError->throw(
100             "MongoDB host '" . $link->address . "' doesn't support collation" )
101             if !$link->supports_collation;
102              
103 0 0         MongoDB::UsageError->throw(
104             "Unacknowledged bulk writes that specify a collation are not allowed")
105             if !$self->_should_use_acknowledged_write;
106             }
107              
108 0           my $use_write_cmd = $link->supports_write_commands;
109              
110             # If using legacy write ops, then there will never be a valid modified_count
111             # result so we set that to undef in the constructor; otherwise, we set it
112             # to 0 so that results accumulate normally. If a mongos on a mixed topology
113             # later fails to set it, results merging will handle it in that case.
114             # If unacknowledged, we have to accumulate a result to get bulk semantics
115             # right and just throw it away later.
116 0 0         my $result = MongoDB::BulkWriteResult->_new(
117             modified_count => ( $use_write_cmd ? 0 : undef ),
118             write_errors => [],
119             write_concern_errors => [],
120             op_count => 0,
121             batch_count => 0,
122             inserted_count => 0,
123             upserted_count => 0,
124             matched_count => 0,
125             deleted_count => 0,
126             upserted => [],
127             inserted => [],
128             );
129              
130 0 0         my @batches =
131             $self->ordered
132             ? $self->_batch_ordered( $link, $self->queue )
133             : $self->_batch_unordered( $link, $self->queue );
134              
135 0           for my $batch (@batches) {
136 0 0         if ($use_write_cmd) {
137 0           $self->_execute_write_command_batch( $link, $batch, $result );
138             }
139             else {
140 0           $self->_execute_legacy_batch( $link, $batch, $result );
141             }
142             }
143              
144 0 0         return MongoDB::UnacknowledgedResult->_new(
145             write_errors => [],
146             write_concern_errors => [],
147             ) if ! $self->_should_use_acknowledged_write;
148              
149             # only reach here with an error for unordered bulk ops
150 0           $result->assert_no_write_error;
151              
152             # write concern errors are thrown only for the entire batch
153 0           $result->assert_no_write_concern_error;
154              
155 0           return $result;
156             }
157              
158             my %OP_MAP = (
159             insert => [ insert => 'documents' ],
160             update => [ update => 'updates' ],
161             delete => [ delete => 'deletes' ],
162             );
163              
164             # _execute_write_command_batch may split batches if they are too large and
165             # execute them separately
166              
167             sub _execute_write_command_batch {
168 0     0     my ( $self, $link, $batch, $result ) = @_;
169              
170 0           my ( $type, $docs ) = @$batch;
171 0           my ( $cmd, $op_key ) = @{ $OP_MAP{$type} };
  0            
172              
173 0           my $boolean_ordered = boolean( $self->ordered );
174             my ( $db_name, $coll_name, $wc ) =
175 0           map { $self->$_ } qw/db_name coll_name write_concern/;
  0            
176              
177 0           my @left_to_send = ($docs);
178              
179 0           my $max_bson_size = $link->max_bson_object_size;
180 0           my $supports_document_validation = $link->supports_document_validation;
181              
182 0           while (@left_to_send) {
183 0           my $chunk = shift @left_to_send;
184             # for update/insert, pre-encode docs as they need custom BSON handling
185             # that can't be applied to an entire write command at once
186 0 0         if ( $cmd eq 'update' ) {
    0          
187             # take array of hash, validate and encode each update doc; since this
188             # might be called more than once if chunks are getting split, check if
189             # the update doc is already encoded; this also removes the 'is_replace'
190             # field that needs to not be in the command sent to the server
191 0           for ( my $i = 0; $i <= $#$chunk; $i++ ) {
192 0 0         next if ref( $chunk->[$i]{u} ) eq 'BSON::Raw';
193 0           my $is_replace = delete $chunk->[$i]{is_replace};
194 0           $chunk->[$i]{u} = $self->_pre_encode_update( $max_bson_size, $chunk->[$i]{u}, $is_replace );
195             }
196             }
197             elsif ( $cmd eq 'insert' ) {
198             # take array of docs, encode each one while saving original or generated _id
199             # field; since this might be called more than once if chunks are getting
200             # split, check if the doc is already encoded
201 0           for ( my $i = 0; $i <= $#$chunk; $i++ ) {
202 0 0         unless ( ref( $chunk->[$i] ) eq 'BSON::Raw' ) {
203 0           $chunk->[$i] = $self->_pre_encode_insert( $max_bson_size, $chunk->[$i], '.' );
204             };
205             }
206             }
207              
208             my $cmd_doc = [
209             $cmd => $coll_name,
210             $op_key => $chunk,
211             ordered => $boolean_ordered,
212 0           @{ $wc->as_args },
  0            
213             ];
214              
215 0 0 0       if ( $cmd eq 'insert' || $cmd eq 'update' ) {
216 0           $cmd_doc = $self->_maybe_bypass( $supports_document_validation, $cmd_doc );
217             }
218              
219 0           my $op = MongoDB::Op::_Command->_new(
220             db_name => $db_name,
221             query => $cmd_doc,
222             query_flags => {},
223             bson_codec => $self->bson_codec,
224             session => $self->session,
225             retryable_write => $self->retryable_write,
226             monitoring_callback => $self->monitoring_callback,
227             );
228              
229             my $cmd_result = eval {
230 0 0         $self->_is_retryable
231             ? $self->client->send_retryable_write_op( $op )
232             : $self->client->send_write_op( $op );
233 0 0         } or do {
234 0   0       my $error = $@ || "Unknown error";
235             # This error never touches the database!.... so is before any retryable writes errors etc.
236 0 0         if ( $error->$_isa("MongoDB::_CommandSizeError") ) {
    0          
237 0 0         if ( @$chunk == 1 ) {
238 0           MongoDB::DocumentError->throw(
239             message => "document too large",
240             document => $chunk->[0],
241             );
242             }
243             else {
244 0           unshift @left_to_send, $self->_split_chunk( $chunk, $error->size );
245             }
246             }
247             elsif ( $error->$_can( 'result' ) ) {
248             # We are already going to explode from something here, but
249             # BulkWriteResult has the correct parsing method to allow us to
250             # check for write errors, as they have a higher priority than
251             # write concern errors.
252 0           MongoDB::BulkWriteResult->_parse_cmd_result(
253             op => $type,
254             op_count => scalar @$chunk,
255             result => $error->result,
256             cmd_doc => $cmd_doc,
257             )->assert_no_write_error;
258             # Explode with original error
259 0           die $error;
260             }
261             else {
262 0           die $error;
263             }
264             };
265              
266 0 0         redo unless $cmd_result; # restart after a chunk split
267              
268 0           my $r = MongoDB::BulkWriteResult->_parse_cmd_result(
269             op => $type,
270             op_count => scalar @$chunk,
271             result => $cmd_result,
272             cmd_doc => $cmd_doc,
273             );
274              
275             # append corresponding ops to errors
276 0 0         if ( $r->count_write_errors ) {
277 0           for my $error ( @{ $r->write_errors } ) {
  0            
278 0           $error->{op} = $chunk->[ $error->{index} ];
279             }
280             }
281              
282 0           $result->_merge_result($r);
283 0 0         $result->assert_no_write_error if $boolean_ordered;
284             }
285              
286 0           return;
287             }
288              
289             sub _split_chunk {
290 0     0     my ( $self, $chunk, $size ) = @_;
291              
292 0           my $avg_cmd_size = $size / @$chunk;
293 0           my $new_cmds_per_chunk = int( MAX_BSON_WIRE_SIZE / $avg_cmd_size );
294              
295 0           my @split_chunks;
296 0           while (@$chunk) {
297 0           push @split_chunks, [ splice( @$chunk, 0, $new_cmds_per_chunk ) ];
298             }
299              
300 0           return @split_chunks;
301             }
302              
303             sub _batch_ordered {
304 0     0     my ( $self, $link, $queue ) = @_;
305 0           my @batches;
306 0           my $last_type = '';
307 0           my $count = 0;
308              
309 0           my $max_batch_count = $link->max_write_batch_size;
310              
311 0           for my $op (@$queue) {
312 0           my ( $type, $doc ) = @$op;
313 0 0 0       if ( $type ne $last_type || $count == $max_batch_count ) {
314 0           push @batches, [ $type => [$doc] ];
315 0           $last_type = $type;
316 0           $count = 1;
317             }
318             else {
319 0           push @{ $batches[-1][-1] }, $doc;
  0            
320 0           $count++;
321             }
322             }
323              
324 0           return @batches;
325             }
326              
327             sub _batch_unordered {
328 0     0     my ( $self, $link, $queue ) = @_;
329 0           my %batches = map { ; $_ => [ [] ] } keys %OP_MAP;
  0            
330              
331 0           my $max_batch_count = $link->max_write_batch_size;
332              
333 0           for my $op (@$queue) {
334 0           my ( $type, $doc ) = @$op;
335 0 0         if ( @{ $batches{$type}[-1] } == $max_batch_count ) {
  0            
336 0           push @{ $batches{$type} }, [$doc];
  0            
337             }
338             else {
339 0           push @{ $batches{$type}[-1] }, $doc;
  0            
340             }
341             }
342              
343             # insert/update/delete are guaranteed to be in random order on Perl 5.18+
344 0           my @batches;
345 0           for my $type ( grep { scalar @{ $batches{$_}[-1] } } keys %batches ) {
  0            
  0            
346 0           push @batches, map { [ $type => $_ ] } @{ $batches{$type} };
  0            
  0            
347             }
348 0           return @batches;
349             }
350              
351             sub _execute_legacy_batch {
352 0     0     my ( $self, $link, $batch, $result ) = @_;
353 0           my ( $type, $docs ) = @$batch;
354 0           my $ordered = $self->ordered;
355              
356             # if write concern is not safe, we have to proxy with a safe one so that
357             # we can interrupt ordered bulks, even while ignoring the actual error
358 0           my $wc = $self->write_concern;
359 0           my $w_0 = !$wc->is_acknowledged;
360 0 0         if ($w_0) {
361 0           my $wc_args = $wc->as_args();
362 0 0         my $wcs = scalar @$wc_args ? $wc->as_args()->[1] : {};
363 0           $wcs->{w} = 1;
364 0           $wc = MongoDB::WriteConcern->new($wcs);
365             }
366              
367             # XXX successive inserts ought to get batched up, up to the max size for
368             # batch, but we have no feedback on max size to know how many to put
369             # together. I wonder if send_insert should return a list of write results,
370             # or if it should just strip out however many docs it can from an arrayref
371             # and leave the rest, and then this code can iterate.
372              
373 0           for my $doc (@$docs) {
374              
375 0           my $op;
376 0 0         if ( $type eq 'insert' ) {
    0          
    0          
377 0           $op = MongoDB::Op::_InsertOne->_new(
378             db_name => $self->db_name,
379             coll_name => $self->coll_name,
380             full_name => $self->db_name . "." . $self->coll_name,
381             document => $doc,
382             write_concern => $wc,
383             bson_codec => $self->bson_codec,
384             monitoring_callback => $self->monitoring_callback,
385             );
386             }
387             elsif ( $type eq 'update' ) {
388             $op = MongoDB::Op::_Update->_new(
389             db_name => $self->db_name,
390             coll_name => $self->coll_name,
391             full_name => $self->db_name . "." . $self->coll_name,
392             filter => $doc->{q},
393             update => $doc->{u},
394             multi => $doc->{multi},
395             upsert => $doc->{upsert},
396             write_concern => $wc,
397             is_replace => $doc->{is_replace},
398 0           bson_codec => $self->bson_codec,
399             monitoring_callback => $self->monitoring_callback,
400             );
401             }
402             elsif ( $type eq 'delete' ) {
403             $op = MongoDB::Op::_Delete->_new(
404             db_name => $self->db_name,
405             coll_name => $self->coll_name,
406             full_name => $self->db_name . "." . $self->coll_name,
407             filter => $doc->{q},
408             just_one => !!$doc->{limit},
409 0           write_concern => $wc,
410             bson_codec => $self->bson_codec,
411             monitoring_callback => $self->monitoring_callback,
412             );
413             }
414              
415             my $op_result = eval {
416 0           $op->execute($link);
417 0 0         } or do {
418 0   0       my $error = $@ || "Unknown error";
419 0 0 0       if ( $error->$_isa("MongoDB::DatabaseError")
420             && $error->result->does("MongoDB::Role::_WriteResult") )
421             {
422 0           return $error->result;
423             }
424 0 0 0       die $error unless $w_0 && /exceeds maximum size/;
425 0           return undef; ## no critic: this makes op_result undef
426             };
427              
428 0 0         my $gle_result =
429             $op_result ? MongoDB::BulkWriteResult->_parse_write_op($op_result) : undef;
430              
431             # Even for {w:0}, if the batch is ordered we have to break on the first
432             # error, but we don't throw the error to the user.
433 0 0         if ($w_0) {
434 0 0 0       last if $ordered && ( !$gle_result || $gle_result->count_write_errors );
      0        
435             }
436             else {
437 0           $result->_merge_result($gle_result);
438 0 0         $result->assert_no_write_error if $ordered;
439             }
440             }
441              
442 0           return;
443             }
444              
445             1;