File Coverage

blib/lib/MongoDB/Op/_Aggregate.pm
Criterion Covered Total %
statement 24 55 43.6
branch 0 28 0.0
condition 0 20 0.0
subroutine 8 9 88.8
pod 0 1 0.0
total 32 113 28.3


line stmt bran cond sub pod time code
1             # Copyright 2015 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 59     59   421 use strict;
  59         156  
  59         1804  
16 59     59   351 use warnings;
  59         157  
  59         2139  
17             package MongoDB::Op::_Aggregate;
18              
19             # Encapsulate aggregate operation; return MongoDB::QueryResult
20              
21 59     59   327 use version;
  59         128  
  59         404  
22             our $VERSION = 'v2.2.1';
23              
24 59     59   5006 use Moo;
  59         203  
  59         426  
25              
26 59     59   20677 use MongoDB::Op::_Command;
  59         160  
  59         1876  
27 59         541 use MongoDB::_Types qw(
28             ArrayOfHashRef
29             Boolish
30 59     59   384 );
  59         161  
31 59         392 use Types::Standard qw(
32             HashRef
33             InstanceOf
34             Num
35 59     59   71447 );
  59         156  
36              
37 59     59   54980 use namespace::clean;
  59         150  
  59         437  
38              
39             has client => (
40             is => 'ro',
41             required => 1,
42             isa => InstanceOf ['MongoDB::MongoClient'],
43             );
44              
45             has pipeline => (
46             is => 'ro',
47             required => 1,
48             isa => ArrayOfHashRef,
49             );
50              
51             has options => (
52             is => 'ro',
53             required => 1,
54             isa => HashRef,
55             );
56              
57             has has_out => (
58             is => 'ro',
59             required => 1,
60             isa => Boolish,
61             );
62              
63             has maxAwaitTimeMS => (
64             is => 'rw',
65             isa => Num,
66             );
67              
68             with $_ for qw(
69             MongoDB::Role::_PrivateConstructor
70             MongoDB::Role::_CollectionOp
71             MongoDB::Role::_ReadOp
72             MongoDB::Role::_WriteOp
73             MongoDB::Role::_CommandCursorOp
74             );
75              
76             sub execute {
77 0     0 0   my ( $self, $link, $topology ) = @_;
78              
79 0           my $options = $self->options;
80 0           my $is_2_6 = $link->supports_write_commands;
81              
82             # maxTimeMS isn't available until 2.6 and the aggregate command
83             # will reject it as unrecognized
84 0 0         delete $options->{maxTimeMS} unless $is_2_6;
85              
86             # bypassDocumentValidation isn't available until 3.2 (wire version 4) & dont send if false
87 0 0 0       unless ($link->supports_document_validation && $options->{bypassDocumentValidation}) {
88 0           delete $options->{bypassDocumentValidation};
89             }
90              
91 0 0 0       if ( defined $options->{collation} and !$link->supports_collation ) {
92 0           MongoDB::UsageError->throw(
93             "MongoDB host '" . $link->address . "' doesn't support collation" );
94             }
95              
96             # If 'cursor' is explicitly false, we disable using cursors, even
97             # for MongoDB 2.6+. This allows users operating with a 2.6+ mongos
98             # and pre-2.6 mongod in shards to avoid fatal errors. This
99             # workaround should be removed once MongoDB 2.4 is no longer supported.
100             my $use_cursor = $is_2_6
101 0   0       && ( !exists( $options->{cursor} ) || $options->{cursor} );
102              
103             # batchSize is not a command parameter itself like other options
104 0           my $batchSize = delete $options->{batchSize};
105              
106             # If we're doing cursors, we first respect an explicit batchSize option;
107             # next we fallback to the legacy (deprecated) cursor option batchSize; finally we
108             # just give an empty document. Other than batchSize we ignore any other
109             # legacy cursor options. If we're not doing cursors, don't send any
110             # cursor option at all, as servers will choke on it.
111 0 0         if ($use_cursor) {
112 0 0         if ( defined $batchSize ) {
    0          
113 0           $options->{cursor} = { batchSize => $batchSize };
114             }
115             elsif ( ref $options->{cursor} eq 'HASH' ) {
116 0           $batchSize = $options->{cursor}{batchSize};
117 0 0         $options->{cursor} = defined($batchSize) ? { batchSize => $batchSize } : {};
118             }
119             else {
120 0           $options->{cursor} = {};
121             }
122             }
123             else {
124 0           delete $options->{cursor};
125             }
126              
127 0           my $has_out = $self->has_out;
128              
129 0 0 0       if ( $self->coll_name eq 1 && ! $link->supports_db_aggregation ) {
130 0           MongoDB::Error->throw(
131             "Calling aggregate with a collection name of '1' is not supported on Wire Version < 6" );
132             }
133              
134             my @command = (
135             aggregate => $self->coll_name,
136             pipeline => $self->pipeline,
137             %$options,
138             (
139             $link->supports_aggregate_out_read_concern || (!$has_out && $link->supports_read_concern) ?
140 0           @{ $self->read_concern->as_args( $self->session) } : ()
141             ),
142             (
143 0 0 0       $has_out && $link->supports_helper_write_concern ? @{ $self->write_concern->as_args } : ()
  0 0 0        
144             ),
145             );
146              
147 0 0         my $op = MongoDB::Op::_Command->_new(
148             db_name => $self->db_name,
149             query => Tie::IxHash->new(@command),
150             query_flags => {},
151             bson_codec => $self->bson_codec,
152             ( $has_out ? () : ( read_preference => $self->read_preference ) ),
153             session => $self->session,
154             monitoring_callback => $self->monitoring_callback,
155             );
156              
157 0           my $res = $op->execute( $link, $topology );
158              
159 0 0         $res->assert_no_write_concern_error if $has_out;
160              
161             # For explain, we give the whole response as fields have changed in
162             # different server versions
163 0 0         if ( $options->{explain} ) {
164 0           return MongoDB::QueryResult->_new(
165             _client => $self->client,
166             _address => $link->address,
167             _full_name => '',
168             _bson_codec => $self->bson_codec,
169             _batch_size => 1,
170             _cursor_at => 0,
171             _limit => 0,
172             _cursor_id => 0,
173             _cursor_start => 0,
174             _cursor_flags => {},
175             _cursor_num => 1,
176             _docs => [ $res->output ],
177             );
178             }
179              
180             # Fake up a single-batch cursor if we didn't get a cursor response.
181             # We use the 'results' fields as the first (and only) batch
182 0 0         if ( !$res->output->{cursor} ) {
183             $res->output->{cursor} = {
184             ns => '',
185             id => 0,
186 0   0       firstBatch => ( delete $res->output->{result} ) || [],
187             };
188             }
189              
190 0           return $self->_build_result_from_cursor($res);
191             }
192              
193             1;