File Coverage

blib/lib/MongoDB/_Protocol.pm
Criterion Covered Total %
statement 119 241 49.3
branch 10 90 11.1
condition 0 18 0.0
subroutine 26 44 59.0
pod 0 15 0.0
total 155 408 37.9


line stmt bran cond sub pod time code
1             # Copyright 2014 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 61     61   73467 use v5.8.0;
  61         231  
16 61     61   365 use strict;
  61         153  
  61         1378  
17 61     61   334 use warnings;
  61         160  
  61         2289  
18              
19             package MongoDB::_Protocol;
20              
21 61     61   839 use version;
  61         2070  
  61         317  
22             our $VERSION = 'v2.2.0';
23              
24 61     61   5766 use MongoDB::_Constants;
  61         179  
  61         6781  
25 61     61   891 use MongoDB::Error;
  61         165  
  61         5996  
26 61     61   450 use MongoDB::_Types qw/ to_IxHash /;
  61         177  
  61         571  
27              
28 61     61   102115 use Compress::Zlib ();
  61         3240311  
  61         2979  
29              
30             use constant {
31 61         9538 OP_REPLY => 1, # Reply to a client request. responseTo is set
32             OP_UPDATE => 2001, # update document
33             OP_INSERT => 2002, # insert new document
34             RESERVED => 2003, # formerly used for OP_GET_BY_OID
35             OP_QUERY => 2004, # query a collection
36             OP_GET_MORE => 2005, # Get more data from a query. See Cursors
37             OP_DELETE => 2006, # Delete documents
38             OP_KILL_CURSORS => 2007, # Tell database client is done with a cursor
39             OP_COMPRESSED => 2012, # wire compression
40             OP_MSG => 2013, # generic bi-directional op code
41 61     61   562 };
  61         162  
42              
43             use constant {
44 61         4437 PERL58 => $] lt '5.010',
45             MIN_REPLY_LENGTH => 4 * 5 + 8 + 4 * 2,
46             MAX_REQUEST_ID => 2**31 - 1,
47 61     61   451 };
  61         148  
48              
49             # Perl < 5.10, pack doesn't have endianness modifiers, and the MongoDB wire
50             # protocol mandates little-endian order. For 5.10, we can use modifiers but
51             # before that we only work on platforms that are natively little-endian. We
52             # die during configuration on big endian platforms on 5.8
53              
54             use constant {
55 61         6428 P_HEADER => PERL58 ? "l4" : "l<4",
56 61     61   396 };
  61         157  
57              
58             # These ops all include P_HEADER already
59             use constant {
60 61         10346 P_UPDATE => PERL58 ? "l5Z*l" : "l<5Z*l<",
61             P_INSERT => PERL58 ? "l5Z*" : "l<5Z*",
62             P_QUERY => PERL58 ? "l5Z*l2" : "l<5Z*l<2",
63             P_GET_MORE => PERL58 ? "l5Z*la8" : "l<5Z*l
64             P_DELETE => PERL58 ? "l5Z*l" : "l<5Z*l<",
65             P_KILL_CURSORS => PERL58 ? "l6(a8)*" : "l<6(a8)*",
66             P_REPLY_HEADER => PERL58 ? "l5a8l2" : "l<5a8l<2",
67             P_COMPRESSED => PERL58 ? "l6C" : "l<6C",
68             P_MSG => PERL58 ? "l5" : "l<5",
69             P_MSG_PL_1 => PERL58 ? "lZ*" : "l
70 61     61   452 };
  61         159  
71              
72             # struct MsgHeader {
73             # int32 messageLength; // total message size, including this
74             # int32 requestID; // identifier for this message
75             # int32 responseTo; // requestID from the original request
76             # // (used in reponses from db)
77             # int32 opCode; // request type - see table below
78             # }
79             #
80             # Approach for MsgHeader is to write a header with 0 for length, then
81             # fix it up after the message is constructed. E.g.
82             # my $msg = pack( P_INSERT, 0, int(rand(2**32-1)), 0, OP_INSERT, 0, $ns ) . $bson_docs;
83             # substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
84              
85             use constant {
86             # length for MsgHeader
87 61         5263 P_HEADER_LENGTH =>
88             length(pack P_HEADER, 0, 0, 0, 0),
89             # length for OP_COMPRESSED
90             P_COMPRESSED_PREFIX_LENGTH =>
91             length(pack P_COMPRESSED, 0, 0, 0, 0, 0, 0, 0),
92             P_MSG_PREFIX_LENGTH =>
93             length(pack P_MSG, 0, 0, 0, 0, 0),
94 61     61   459 };
  61         160  
95              
96             # struct OP_MSG {
97             # MsgHeader header; // standard message header, with opCode 2013
98             # uint32 flagBits;
99             # Section+ sections;
100             # [uint32 checksum;]
101             # };
102             #
103             # struct Section {
104             # uint8 payloadType;
105             # union payload {
106             # document document; // payloadType == 0
107             # struct sequence { // payloadType == 1
108             # int32 size;
109             # cstring identifier;
110             # document* documents;
111             # };
112             # };
113             # };
114              
115             use constant {
116 61         5400 P_SECTION_PAYLOAD_TYPE => "C",
117             P_SECTION_SEQUENCE_SIZE => PERL58 ? "l" : "l<",
118 61     61   466 };
  61         162  
119              
120             use constant {
121 61         46281 P_SECTION_HEADER => P_SECTION_PAYLOAD_TYPE . P_SECTION_SEQUENCE_SIZE,
122             P_SECTION_PAYLOAD_TYPE_LENGTH => length( pack P_SECTION_PAYLOAD_TYPE, 0 ),
123             P_SECTION_SEQUENCE_SIZE_LENGTH => length( pack P_SECTION_SEQUENCE_SIZE, 0 ),
124 61     61   442 };
  61         180  
125              
126             # Takes a command, returns sections ready for joining
127              
128             sub prepare_sections {
129 1     1 0 5415 my ( $codec, $cmd ) = @_;
130              
131 1         6 my %split_commands = (
132             insert => 'documents',
133             update => 'updates',
134             delete => 'deletes',
135             );
136              
137 1         5 $cmd = to_IxHash( $cmd );
138              
139             # Command is always first key in cmd
140 1         95 my $command = do { my @keys = $cmd->Keys; $keys[0] };
  1         6  
  1         11  
141 1         4 my $ident = $split_commands{ $command };
142              
143 1 50       3 if ( defined $ident ) {
144 1         5 my $collection = $cmd->FETCH( $command );
145 1         9 my $docs = $cmd->FETCH( $ident );
146             # Assumes only a single split on the commands
147             return (
148             {
149             type => 0,
150             documents => [ [
151             # Done specifically to not alter $cmd.
152             # The command ($command from earlier) is assumed to be
153             # first in the Keys set
154 1 100       9 map { $_ eq $ident
  4         24  
155             ? ()
156             : ( $_, $cmd->FETCH( $_ ) )
157             } $cmd->Keys()
158             ] ],
159             },
160             {
161             type => 1,
162             identifier => $ident,
163             documents => $docs,
164             }
165             );
166             } else {
167             # Not a recognised command to split, just set up ready for later
168             return (
169             {
170 0         0 type => 0,
171             documents => [ $cmd ],
172             }
173             );
174             }
175             }
176              
177             # encode_section
178             #
179             # MongoDB::_Protocol::encode_section( $codec, {
180             # type => 0, # 0 or 1
181             # identifier => undef, # optional in type 0
182             # documents => [ $cmd ] # must be an array of documents
183             # });
184             #
185             # Takes a section hashref and encodes it for joining
186              
187             sub encode_section {
188 3     3 0 11544 my ( $codec, $section ) = @_;
189              
190 3         9 my $type = $section->{type};
191 3         6 my $ident = $section->{identifier};
192 3         6 my @docs = map { $codec->encode_one( $_ ) } @{ $section->{documents} };
  4         77  
  3         9  
193              
194 3         188 my $pl;
195 3 100       12 if ( $type == 0 ) {
    50          
196             # Assume a single doc if payload type is 0
197 1         3 $pl = $docs[0];
198             } elsif ( $type == 1 ) {
199 2         15 $pl = pack( P_MSG_PL_1, 0, $ident )
200             . join( '', @docs );
201             # calculate size
202 2         8 substr( $pl, 0, 4, pack( P_SECTION_SEQUENCE_SIZE, length( $pl ) ) );
203             } else {
204 0         0 MongoDB::ProtocolError->throw("Encode: Unsupported section payload type");
205             }
206              
207             # Prepend the section type
208 3         13 $pl = pack( P_SECTION_PAYLOAD_TYPE, $type ) . $pl;
209              
210 3         11 return $pl;
211             }
212              
213             # decode_section
214             #
215             # MongoDB::_Protocol::decode_section( $section )
216             #
217             # Takes an encoded section and decodes it, exactly the opposite of encode_section.
218              
219             sub decode_section {
220 14     14 0 13001 my ( $doc ) = @_;
221 14         28 my ( $type, $ident, @enc_docs );
222 14         25 my $section = {};
223              
224 14         45 ( $type ) = unpack( 'C', $doc );
225 14         34 my $payload = substr( $doc, P_SECTION_PAYLOAD_TYPE_LENGTH );
226              
227 14         36 $section->{ type } = $type;
228              
229             # Pull size off and double check. Size is in the same place regardless of
230             # payload type, as its a similar struct to a raw document
231 14         39 my ( $pl_size ) = unpack( P_SECTION_SEQUENCE_SIZE, $payload );
232 14 50       41 unless ( $pl_size == length( $payload ) ) {
233 0         0 MongoDB::ProtocolError->throw("Decode: Section size incorrect");
234             }
235              
236 14 100       47 if ( $type == 0 ) {
    50          
237             # payload is a raw document
238 6         16 push @enc_docs, $payload;
239             } elsif ( $type == 1 ) {
240 8         24 $payload = substr( $payload, P_SECTION_SEQUENCE_SIZE_LENGTH );
241             # Pull out then remove
242 8         26 ( $ident ) = unpack( 'Z*', $payload );
243 8         19 $section->{ identifier } = $ident;
244 8         21 $payload = substr( $payload, length ( $ident ) + 1 ); # add one for null termination
245              
246 8         22 while ( length $payload ) {
247 12         23 my $doc_size = unpack( P_SECTION_SEQUENCE_SIZE, $payload );
248 12         26 my $doc = substr( $payload, 0, $doc_size );
249 12         23 $payload = substr( $payload, $doc_size );
250 12         46 push @enc_docs, $doc;
251             }
252             } else {
253 0         0 MongoDB::ProtocolError->throw("Decode: Unsupported section payload type");
254             }
255 14         32 $section->{ documents } = \@enc_docs;
256              
257 14         37 return $section;
258             }
259              
260             # method split_sections( $msg )
261             #
262             # Splits sections based on their payload length header. Returns an array of
263             # sections in packed form
264              
265             sub split_sections {
266 5     5 0 36451 my $msg = shift;
267 5         12 my @sections;
268 5         20 while ( length $msg ) {
269             # get first section length
270 11         52 my ( undef, $section_length ) = unpack( P_SECTION_HEADER, $msg );
271              
272             # Add the payload type length as we reached over it for the length
273 11         36 my $section = substr( $msg, 0, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH );
274              
275 11         31 push @sections, decode_section( $section );
276              
277 11         35 $msg = substr( $msg, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH );
278             }
279              
280 5         21 return @sections;
281             }
282              
283             use constant {
284 61         53800 MSG_FB_CHECKSUM => 0,
285             MSG_FB_MORE_TO_COME => 1,
286 61     61   568 };
  61         151  
287              
288             sub write_msg {
289 0     0 0   my ( $codec, $flags, $cmd ) = @_;
290 0           my $flagbits = 0;
291             # checksum is reserved for future use
292 0 0         if ( $flags ) {
293             $flagbits =
294             ( $flags->{checksum_present} ? 1 << MSG_FB_CHECKSUM : 0 )
295 0 0         | ( $flags->{more_to_come} ? 1 << MSG_FB_MORE_TO_COME : 0 );
    0          
296             }
297              
298 0           my $request_id = int( rand( MAX_REQUEST_ID ) );
299              
300 0           my @sections = prepare_sections( $codec, $cmd );
301              
302 0           my $encoded_sections = join ('', ( map { encode_section( $codec, $_ ) } @sections ) );
  0            
303              
304 0           my $msg = pack( P_MSG, 0, $request_id, 0, OP_MSG, 0 )
305             . $encoded_sections;
306 0           substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
307 0           return ( $msg, $request_id );
308             }
309              
310             # struct OP_COMPRESSED {
311             # MsgHeader header; // standard message header
312             # int32_t originalOpcode; // wrapped op code
313             # int32_t uncompressedSize; // size of deflated wo. header
314             # uint8_t compressorId; // compressor
315             # char* compressedMessage; // compressed contents
316             # };
317              
318             # Note that Zlib is in perl core (since 5.9.3) so shouldnt need lazy loading
319             sub _assert_zstd {
320             MongoDB::UsageError->throw(qq/Compress::Zstd must be installed to support zstd compression\n/)
321 0 0   0     unless eval { require Compress::Zstd };
  0            
322             }
323              
324             sub _assert_snappy {
325             MongoDB::UsageError->throw(qq/Compress::Snappy must be installed to support snappy compression\n/)
326 0 0   0     unless eval { require Compress::Snappy };
  0            
327             }
328              
329             # decompressors indexed by ID.
330             my @DECOMPRESSOR = (
331             # none
332             sub { shift },
333             # snappy
334             sub { Compress::Snappy::decompress(shift) },
335             # zlib
336             sub { Compress::Zlib::uncompress(shift) },
337             # zstd
338             sub { Compress::Zstd::decompress(shift) },
339             );
340              
341             # construct compressor by name with options
342             sub get_compressor {
343 0     0 0   my ($name, $comp_opt) = @_;
344              
345 0 0         if ($name eq 'none') {
    0          
    0          
    0          
346             return {
347             id => 0,
348 0     0     callback => sub { shift },
349 0           };
350             }
351             elsif ($name eq 'snappy') {
352 0           _assert_snappy();
353             return {
354             id => 1,
355 0     0     callback => sub { Compress::Snappy::compress(shift) },
356 0           };
357             }
358             elsif ($name eq 'zlib') {
359 0           my $level = $comp_opt->{zlib_compression_level};
360 0 0 0       $level = undef
361             if defined $level and $level < 0;
362             return {
363             id => 2,
364             callback => sub {
365 0 0   0     return Compress::Zlib::compress(
366             $_[0],
367             defined($level) ? $level : Compress::Zlib::Z_DEFAULT_COMPRESSION(),
368             );
369             },
370 0           };
371             }
372             elsif ($name eq 'zstd') {
373 0           _assert_zstd();
374             return {
375             id => 3,
376 0     0     callback => sub { Compress::Zstd::compress(shift) },
377 0           };
378             }
379             else {
380 0           MongoDB::ProtocolError->throw("Unknown compressor '$name'");
381             }
382             }
383              
384             # compress message
385             sub compress {
386 0     0 0   my ($msg, $compressor) = @_;
387              
388 0           my ($len, $request_id, $response_to, $op_code)
389             = unpack(P_HEADER, $msg);
390              
391 0           $msg = substr $msg, P_HEADER_LENGTH;
392              
393             my $msg_comp = pack(
394             P_COMPRESSED,
395             0, $request_id, $response_to, OP_COMPRESSED,
396             $op_code,
397             length($msg),
398             $compressor->{id},
399 0           ).$compressor->{callback}->($msg);
400              
401 0           substr($msg_comp, 0, 4, pack(P_INT32, length($msg_comp)));
402 0           return $msg_comp;
403             }
404              
405             # attempt to uncompress message
406             # messages that aren't OP_COMPRESSED are returned as-is
407             sub try_uncompress {
408 0     0 0   my ($msg) = @_;
409              
410 0           my ($len, $request_id, $response_to, $op_code, $orig_op_code, $orig_len, $comp_id)
411             = unpack(P_COMPRESSED, $msg);
412              
413 0 0         return $msg
414             if $op_code != OP_COMPRESSED;
415              
416 0           $msg = substr $msg, P_COMPRESSED_PREFIX_LENGTH;
417              
418 0 0         my $decompressor = $DECOMPRESSOR[$comp_id]
419             or MongoDB::ProtocolError->throw("Unknown compressor ID '$comp_id'");
420              
421 0           my $decomp_msg = $decompressor->($msg);
422 0           my $done =
423             pack(P_HEADER, $orig_len, $request_id, $response_to, $orig_op_code)
424             .$decomp_msg;
425              
426 0           return $done;
427              
428             }
429              
430             # struct OP_UPDATE {
431             # MsgHeader header; // standard message header
432             # int32 ZERO; // 0 - reserved for future use
433             # cstring fullCollectionName; // "dbname.collectionname"
434             # int32 flags; // bit vector. see below
435             # document selector; // the query to select the document
436             # document update; // specification of the update to perform
437             # }
438              
439             use constant {
440 61         12215 U_UPSERT => 0,
441             U_MULTI_UPDATE => 1,
442 61     61   562 };
  61         177  
443              
444             sub write_update {
445 0     0 0   my ( $ns, $selector, $update, $flags ) = @_;
446 0           utf8::encode($ns);
447              
448 0           my $request_id = int( rand( MAX_REQUEST_ID ) );
449              
450 0           my $bitflags = 0;
451 0 0         if ($flags) {
452             $bitflags =
453             ( $flags->{upsert} ? 1 << U_UPSERT : 0 )
454 0 0         | ( $flags->{multi} ? 1 << U_MULTI_UPDATE : 0 );
    0          
455             }
456              
457 0           my $msg =
458             pack( P_UPDATE, 0, $request_id, 0, OP_UPDATE, 0, $ns, $bitflags )
459             . $selector
460             . $update;
461 0           substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
462 0           return $msg, $request_id;
463             }
464              
465             # struct OP_INSERT {
466             # MsgHeader header; // standard message header
467             # int32 flags; // bit vector - see below
468             # cstring fullCollectionName; // "dbname.collectionname"
469             # document* documents; // one or more documents to insert into the collection
470             # }
471              
472 61     61   543 use constant { I_CONTINUE_ON_ERROR => 0, };
  61         201  
  61         11539  
473              
474             sub write_insert {
475 0     0 0   my ( $ns, $bson_docs, $flags ) = @_;
476 0           utf8::encode($ns);
477              
478 0           my $request_id = int( rand( MAX_REQUEST_ID ) );
479              
480 0           my $bitflags = 0;
481 0 0         if ($flags) {
482 0 0         $bitflags = ( $flags->{continue_on_error} ? 1 << I_CONTINUE_ON_ERROR : 0 );
483             }
484              
485 0           my $msg =
486             pack( P_INSERT, 0, $request_id, 0, OP_INSERT, $bitflags, $ns )
487             . $bson_docs;
488 0           substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
489 0           return $msg, $request_id;
490             }
491              
492             # struct OP_QUERY {
493             # MsgHeader header; // standard message header
494             # int32 flags; // bit vector of query options. See below for details.
495             # cstring fullCollectionName ; // "dbname.collectionname"
496             # int32 numberToSkip; // number of documents to skip
497             # int32 numberToReturn; // number of documents to return
498             # // in the first OP_REPLY batch
499             # document query; // query object. See below for details.
500             # [ document returnFieldsSelector; ] // Optional. Selector indicating the fields
501             # // to return. See below for details.
502             # }
503              
504             use constant {
505 61         23816 Q_TAILABLE => 1,
506             Q_SLAVE_OK => 2,
507             Q_NO_CURSOR_TIMEOUT => 4,
508             Q_AWAIT_DATA => 5,
509             Q_EXHAUST => 6, # unsupported (PERL-282)
510             Q_PARTIAL => 7,
511 61     61   496 };
  61         163  
512              
513             sub write_query {
514 0     0 0   my ( $ns, $query, $fields, $skip, $batch_size, $flags ) = @_;
515              
516 0           utf8::encode($ns);
517              
518 0           my $bitflags = 0;
519 0 0         if ($flags) {
520             $bitflags =
521             ( $flags->{tailable} ? 1 << Q_TAILABLE : 0 )
522             | ( $flags->{slave_ok} ? 1 << Q_SLAVE_OK : 0 )
523             | ( $flags->{await_data} ? 1 << Q_AWAIT_DATA : 0 )
524             | ( $flags->{immortal} ? 1 << Q_NO_CURSOR_TIMEOUT : 0 )
525 0 0         | ( $flags->{partial} ? 1 << Q_PARTIAL : 0 );
    0          
    0          
    0          
    0          
526             }
527              
528 0           my $request_id = int( rand( MAX_REQUEST_ID ) );
529              
530 0 0 0       my $msg =
531             pack( P_QUERY, 0, $request_id, 0, OP_QUERY, $bitflags, $ns, $skip, $batch_size )
532             . $query
533             . ( defined $fields && length $fields ? $fields : '' );
534 0           substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
535 0           return ( $msg, $request_id );
536             }
537              
538             # struct {
539             # MsgHeader header; // standard message header
540             # int32 ZERO; // 0 - reserved for future use
541             # cstring fullCollectionName; // "dbname.collectionname"
542             # int32 numberToReturn; // number of documents to return
543             # int64 cursorID; // cursorID from the OP_REPLY
544             # }
545              
546             # We treat cursor_id as an opaque string so we don't have to depend
547             # on 64-bit integer support
548              
549             sub write_get_more {
550 0     0 0   my ( $ns, $cursor_id, $batch_size ) = @_;
551 0           utf8::encode($ns);
552 0           my $request_id = int( rand( MAX_REQUEST_ID ) );
553 0           my $msg =
554             pack( P_GET_MORE, 0, $request_id, 0, OP_GET_MORE, 0, $ns, $batch_size,
555             _pack_cursor_id($cursor_id) );
556 0           substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
557 0           return ( $msg, $request_id );
558             }
559              
560             # struct {
561             # MsgHeader header; // standard message header
562             # int32 ZERO; // 0 - reserved for future use
563             # cstring fullCollectionName; // "dbname.collectionname"
564             # int32 flags; // bit vector - see below for details.
565             # document selector; // query object. See below for details.
566             # }
567              
568 61     61   527 use constant { D_SINGLE_REMOVE => 0, };
  61         157  
  61         10977  
569              
570             sub write_delete {
571 0     0 0   my ( $ns, $selector, $flags ) = @_;
572 0           utf8::encode($ns);
573              
574 0           my $request_id = int( rand( MAX_REQUEST_ID ) );
575              
576 0           my $bitflags = 0;
577 0 0         if ($flags) {
578 0 0         $bitflags = ( $flags->{just_one} ? 1 << D_SINGLE_REMOVE : 0 );
579             }
580              
581 0           my $msg =
582             pack( P_DELETE, 0, $request_id, 0, OP_DELETE, 0, $ns, $bitflags )
583             . $selector;
584 0           substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
585 0           return $msg, $request_id;
586             }
587              
588             # legacy alias
589             {
590 61     61   509 no warnings 'once';
  61         203  
  61         10722  
591             *write_remove = \&write_delete;
592             }
593              
594             # struct {
595             # MsgHeader header; // standard message header
596             # int32 ZERO; // 0 - reserved for future use
597             # int32 numberOfCursorIDs; // number of cursorIDs in message
598             # int64* cursorIDs; // sequence of cursorIDs to close
599             # }
600              
601             sub write_kill_cursors {
602 0     0 0   my (@cursors) = map _pack_cursor_id($_), @_;
603              
604 0           my $request_id = int( rand( MAX_REQUEST_ID ) );
605              
606 0           my $msg = pack( P_KILL_CURSORS,
607             0, $request_id,
608             0, OP_KILL_CURSORS, 0, scalar(@cursors), @cursors );
609 0           substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
610 0           return $msg, $request_id;
611             }
612              
613             # struct {
614             # // MessageHeader
615             # int32 messageLength; // total message size, including this
616             # int32 requestID; // identifier for this message
617             # int32 responseTo; // requestID from the original request
618             # int32 opCode; // request type - see table below
619             # // OP_REPLY fields
620             # int32 responseFlags; // bit vector - see details below
621             # int64 cursorID; // cursor id if client needs to do get more's
622             # int32 startingFrom; // where in the cursor this reply is starting
623             # int32 numberReturned; // number of documents in the reply
624             # document* documents; // documents
625             # }
626              
627             # We treat cursor_id as an opaque string so we don't have to depend
628             # on 64-bit integer support
629              
630             # flag bits relevant to drivers
631             use constant {
632 61         34147 R_CURSOR_NOT_FOUND => 0,
633             R_QUERY_FAILURE => 1,
634             R_AWAIT_CAPABLE => 3,
635 61     61   462 };
  61         196  
636              
637             sub parse_reply {
638 0     0 0   my ( $msg, $request_id ) = @_;
639 0 0         MongoDB::ProtocolError->throw("response was truncated")
640             if length($msg) < MIN_REPLY_LENGTH;
641              
642 0           $msg = try_uncompress($msg);
643              
644             my (
645 0           $len, $msg_id, $response_to, $opcode, $bitflags, $cursor_id, $starting_from,
646             $number_returned
647             ) = unpack( P_MSG, $msg );
648              
649             # pre-check all conditions using a modifier in one statement for speed;
650             # disambiguate afterwards only if an error exists
651              
652 0 0 0       do {
      0        
      0        
653              
654 0 0         if ( length($msg) < $len ) {
655 0           MongoDB::ProtocolError->throw("response was truncated");
656             }
657              
658 0 0 0       if ( $opcode != OP_REPLY && $opcode != OP_MSG ) {
659 0           MongoDB::ProtocolError->throw("response was not OP_REPLY or OP_MSG");
660             }
661              
662 0 0         if ( $response_to != $request_id ) {
663 0           MongoDB::ProtocolError->throw(
664             "response ID ($response_to) did not match request ID ($request_id)");
665             }
666             }
667             if ( length($msg) < $len )
668             || ( ( $opcode != OP_REPLY ) && ( $opcode != OP_MSG ) )
669             || ( $response_to != $request_id );
670              
671              
672 0 0         if ( $opcode == OP_MSG ) {
673             # XXX Extract and check checksum - future support of crc32c
674 0           my @sections = split_sections( substr( $msg, P_MSG_PREFIX_LENGTH ) );
675             # We have none of the other stuff? maybe flags... and an array of docs? erm
676             return {
677             flags => {
678             checksum_present => vec( $bitflags, MSG_FB_CHECKSUM, 1 ),
679             more_to_come => vec( $bitflags, MSG_FB_MORE_TO_COME, 1 ),
680             },
681             # XXX Assumes the server never sends a type 1 payload. May change in future
682 0           docs => $sections[0]->{documents}->[0]
683             };
684             } else {
685             # Yes its two unpacks but its just easier than mapping through to the right size
686             (
687 0           $len, $msg_id, $response_to, $opcode, $bitflags, $cursor_id, $starting_from,
688             $number_returned
689             ) = unpack( P_REPLY_HEADER, $msg );
690             }
691              
692             # returns non-zero cursor_id as blessed object to identify it as an
693             # 8-byte opaque ID rather than an ambiguous Perl scalar. N.B. cursors
694             # from commands are handled differently: they are perl integers or
695             # else Math::BigInt objects
696              
697 0 0         substr( $msg, 0, MIN_REPLY_LENGTH, '' ),
698             return {
699             flags => {
700             cursor_not_found => vec( $bitflags, R_CURSOR_NOT_FOUND, 1 ),
701             query_failure => vec( $bitflags, R_QUERY_FAILURE, 1 ),
702             },
703             cursor_id => (
704             ( $cursor_id eq CURSOR_ZERO )
705             ? 0
706             : bless( \$cursor_id, "MongoDB::_CursorID" )
707             ),
708             starting_from => $starting_from,
709             number_returned => $number_returned,
710             docs => $msg,
711             };
712             }
713              
714             #--------------------------------------------------------------------------#
715             # utility functions
716             #--------------------------------------------------------------------------#
717              
718             # CursorID's can come in 3 forms:
719             #
720             # 1. MongoDB::CursorID object (a blessed reference to an 8-byte string)
721             # 2. A perl scalar (an integer)
722             # 3. A Math::BigInt object (64 bit integer on 32-bit perl)
723             #
724             # The _pack_cursor_id function converts any of them to a packed Int64 for
725             # use in OP_GET_MORE or OP_KILL_CURSORS
726             sub _pack_cursor_id {
727 0     0     my $cursor_id = shift;
728 0 0         if ( ref($cursor_id) eq "MongoDB::_CursorID" ) {
    0          
729 0           $cursor_id = $$cursor_id;
730             }
731             elsif ( ref($cursor_id) eq "Math::BigInt" ) {
732 0           my $as_hex = $cursor_id->as_hex; # big-endian hex
733 0           substr( $as_hex, 0, 2, '' ); # remove "0x"
734 0           my $len = length($as_hex);
735 0 0         substr( $as_hex, 0, 0, "0" x ( 16 - $len ) ) if $len < 16; # pad to quad length
736 0           $cursor_id = pack( "H*", $as_hex ); # packed big-endian
737 0           $cursor_id = reverse($cursor_id); # reverse to little-endian
738             }
739             elsif (HAS_INT64) {
740             # pack doesn't have endianness modifiers before perl 5.10.
741             # We die during configuration on big-endian platforms on 5.8
742 0 0         $cursor_id = pack( $] lt '5.010' ? "q" : "q<", $cursor_id );
743             }
744             else {
745             # we on 32-bit perl *and* have a cursor ID that fits in 32 bits,
746             # so pack it as long and pad out to a quad
747             $cursor_id = pack( $] lt '5.010' ? "l" : "l<", $cursor_id ) . ( "\0" x 4 );
748             }
749              
750 0           return $cursor_id;
751             }
752              
753             1;
754              
755             # vim: ts=4 sts=4 sw=4 et: