File Coverage

blib/lib/MongoDBx/Protocol.pm
Criterion Covered Total %
statement 219 232 94.4
branch 24 44 54.5
condition 6 10 60.0
subroutine 36 36 100.0
pod 10 10 100.0
total 295 332 88.8


line stmt bran cond sub pod time code
1             package MongoDBx::Protocol;
2             {
3             $MongoDBx::Protocol::VERSION = '0.03';
4             }
5              
6 1     1   76144 use strict;
  1         4  
  1         57  
7 1     1   7 use warnings;
  1         3  
  1         43  
8              
9             # ABSTRACT: pure perl implementation of MongoDB protocol
10              
11              
12              
13             # for '<' in pack/unpack templates (ensure that bytes order is little-endian)
14             require v5.8.5;
15              
16 1     1   864 use Bit::Vector;
  1         5463  
  1         93  
17 1     1   901 use BSON qw();
  1         36246  
  1         34  
18 1     1   10 use Carp qw(croak);
  1         2  
  1         73  
19 1     1   2751 use Params::Validate qw(:all);
  1         13729  
  1         5823  
20              
21             my $OP_CODES = {
22             reply => 1,
23             msg => 1000,
24             update => 2001,
25             insert => 2002,
26             query => 2004,
27             getmore => 2005,
28             delete => 2006,
29             kill_cursors => 2007,
30             };
31             my $OP_CODE2STR = {};
32             foreach my $str (keys %$OP_CODES) {
33             $OP_CODE2STR->{ $OP_CODES->{$str} } = $str;
34             }
35              
36             my $FLAG2BIT = {
37             update => {
38             Upsert => 0,
39             MultiUpdate => 1,
40             },
41              
42             insert => {
43             ContinueOnError => 0,
44             },
45              
46             query => {
47             TailableCursor => 1,
48             SlaveOk => 2,
49             OplogReplay => 3,
50             NoCursorTimeout => 4,
51             AwaitData => 5,
52             Exhaust => 6,
53             Partial => 7,
54             },
55              
56             delete => {
57             SingleRemove => 0,
58             },
59              
60             reply => {
61             CursorNotFound => 0,
62             QueryFailure => 1,
63             ShardConfigStale => 2,
64             AwaitCapable => 3,
65             },
66             };
67              
68             my $BIT2FLAG = {};
69             foreach my $op_code_str (keys %$FLAG2BIT) {
70             foreach my $flag_name (keys %{$FLAG2BIT->{$op_code_str}}) {
71             my $bit_value = $FLAG2BIT->{$op_code_str}->{$flag_name};
72             $BIT2FLAG->{$op_code_str}->{$bit_value} = $flag_name;
73             }
74             }
75              
76             my $REPLY_FLAGS = {
77             0 => 'CursorNotFound',
78             1 => 'QueryFailure',
79             2 => 'ShardConfigStale',
80             3 => 'AwaitCapable',
81             };
82              
83             my $HEADER_FIELD = { header => { type => HASHREF, default => {} } };
84             my $FLAGS_FIELD = { flags => { type => HASHREF, default => {} } };
85             my $COLLECTION_FIELD = { fullCollectionName => { type => SCALAR } };
86              
87              
88              
89             sub new {
90 9     9 1 10635 my $class = shift;
91 9         15 my $self = {};
92 9         17 bless $self, $class;
93 9         29 return $self;
94             }
95              
96              
97             sub update {
98 1     1 1 127 my $self = shift;
99 1         27 my $p = validate(@_, {
100             %$HEADER_FIELD,
101             %$FLAGS_FIELD,
102             %$COLLECTION_FIELD,
103             selector => { type => HASHREF },
104             update => { type => HASHREF },
105             });
106              
107 1         6 my $op_code_str = 'update';
108              
109 1         3 my $msg = _int32(0) .
110             _cstring($p->{fullCollectionName}) .
111             _flags($p->{flags}, $op_code_str) .
112             _documents($p->{selector}) .
113             _documents($p->{update});
114              
115 1         4 return $self->_with_header($p->{header}, \$msg, $op_code_str);
116             }
117              
118              
119             sub insert {
120 1     1 1 106 my $self = shift;
121 1         24 my $p = validate(@_, {
122             %$HEADER_FIELD,
123             %$FLAGS_FIELD,
124             %$COLLECTION_FIELD,
125             documents => { type => ARRAYREF },
126             });
127              
128 1         5 my $op_code_str = 'insert';
129              
130 1         4 my $msg = _flags($p->{flags}, $op_code_str) .
131             _cstring($p->{fullCollectionName}) .
132             _documents($p->{documents});
133              
134 1         5 return $self->_with_header($p->{header}, \$msg, $op_code_str);
135              
136             }
137              
138              
139             sub query {
140 1     1 1 106 my $self = shift;
141 1         34 my $p = validate(@_, {
142             %$HEADER_FIELD,
143             %$FLAGS_FIELD,
144             %$COLLECTION_FIELD,
145             numberToSkip => { type => SCALAR, regex => qr/^\d+$/o, default => 0 },
146             numberToReturn => { type => SCALAR, regex => qr/^\d+$/o, default => 1 },
147             query => { type => HASHREF },
148             returnFieldSelector => { type => HASHREF, optional => 1 },
149             });
150              
151 1         34 my $op_code_str = 'query';
152 1         4 my $msg = _flags($p->{flags}, $op_code_str) .
153             _cstring($p->{fullCollectionName}) .
154             _int32($p->{numberToSkip}) .
155             _int32($p->{numberToReturn}) .
156             _documents($p->{query}) .
157             _documents($p->{returnFieldSelector});
158              
159 1         44 return $self->_with_header($p->{header}, \$msg, $op_code_str);
160             }
161              
162              
163              
164             sub getmore {
165 1     1 1 112 my $self = shift;
166 1         31 my $p = validate(@_, {
167             %$HEADER_FIELD,
168             %$COLLECTION_FIELD,
169             numberToReturn => { type => SCALAR, regex => qr/^\d+$/o, default => 1 },
170             cursorID => { type => SCALAR, regex => qr/^\d+$/o },
171             });
172              
173 1         32 my $msg = _int32(0) .
174             _cstring($p->{fullCollectionName}) .
175             _int32($p->{numberToReturn}) .
176             _int64($p->{cursorID});
177              
178 1         4 return $self->_with_header($p->{header}, \$msg, 'getmore');
179             }
180              
181              
182             sub delete {
183 1     1 1 1568 my $self = shift;
184 1         122 my $p = validate(@_, {
185             %$HEADER_FIELD,
186             %$COLLECTION_FIELD,
187             %$FLAGS_FIELD,
188             selector => { type => HASHREF },
189             });
190              
191 1         6 my $op_code_str = 'delete';
192              
193 1         4 my $msg = _int32(0) .
194             _cstring($p->{fullCollectionName}) .
195             _flags($p->{flags}, $op_code_str) .
196             _documents($p->{selector});
197              
198 1         6 return $self->_with_header($p->{header}, \$msg, $op_code_str);
199             }
200              
201              
202              
203             sub kill_cursors {
204 1     1 1 172 my $self = shift;
205 1         16 my $p = validate(@_, {
206             %$HEADER_FIELD,
207             cursorIDs => { type => ARRAYREF },
208             });
209              
210 1         5 my $ids_msg = '';
211 1         2 foreach my $id (@{$p->{cursorIDs}}) {
  1         10  
212 2         5 $ids_msg .= _int64($id);
213             }
214              
215 1         3 my $msg = _int32(0) .
216 1         4 _int32(scalar @{$p->{cursorIDs}}) .
217             $ids_msg;
218              
219 1         4 return $self->_with_header($p->{header}, \$msg, 'kill_cursors');
220             }
221              
222              
223              
224             sub msg {
225 1     1 1 103 my $self = shift;
226 1         15 my $p = validate(@_, {
227             %$HEADER_FIELD,
228             message => { type => SCALAR },
229             });
230              
231 1         5 my $msg = _cstring($p->{message});
232              
233 1         5 return $self->_with_header($p->{header}, \$msg, 'msg');
234             }
235              
236              
237             sub reply {
238 1     1 1 123 my $self = shift;
239              
240 1         28 my $p = validate(@_, {
241             %$HEADER_FIELD,
242             responseFlags => { type => HASHREF, default => {} },
243             cursorID => { type => SCALAR, regex => qr/^\d+$/o },
244             startingFrom => { type => SCALAR, regex => qr/^\d+$/o, default => 0 },
245             documents => { type => ARRAYREF },
246             });
247              
248 1         37 my $op_code_str = 'reply';
249              
250 1         11 my $msg = _flags($p->{responseFlags}, $op_code_str) .
251             _int64($p->{cursorID}) .
252             _int32($p->{startingFrom}) .
253 1         4 _int32( scalar @{$p->{documents}} ) .
254             _documents($p->{documents});
255              
256 1         6 return $self->_with_header($p->{header}, \$msg, $op_code_str);
257             }
258              
259              
260             sub decode {
261 8     8 1 7347 my $self = shift;
262 8         11 my ($data, $options) = @_;
263              
264 8 50       19 croak "empty data" unless (defined($data));
265 8 50       18 croak "too small data" if (length($data) < 4);
266 8         26 my @a = unpack("C*", substr($data, 0, 4));
267 8         24 my $len = _decode_int32(substr($data, 0, 4));
268 8 50       20 if (length($data) != $len) {
269 0         0 die "can't parse data, real length of the data != length in the header";
270             }
271              
272 8         20 my $header = _decode_header(substr($data, 0, 4*4, ''));
273              
274 8         12 my $op_code = $header->{opCode};
275 8         12 my $decode_method = "_decode_${op_code}";
276 8         30 my $res = $self->$decode_method(\$data, $options);
277              
278 8         14 $res->{header} = $header;
279              
280 8         32 return $res;
281             }
282              
283             sub _decode_update {
284 1     1   2 my ($self, $data_ref, $options) = @_;
285 1         2 my $data = $$data_ref;
286              
287 1         4 my $zero = _decode_int32(substr($data, 0, 4, ''));
288 1 50       4 if ($zero != 0) {
289 0         0 croak("can't parse 'update' message: no zero int32");
290             }
291              
292 1         2 my $coll;
293 1         2 ($data, $coll) = _decode_cstring($data);
294              
295 1         4 my $flags = _decode_flags(substr($data, 0, 4, ''), 'update');
296 1         4 my $docs = _decode_documents(\$data, $options);
297 1 50       4 if (scalar(@$docs) != 2) {
298 0         0 croak "update message should contains only 2 docs";
299             }
300              
301 1         4 my $res = {
302             fullCollectionName => $coll,
303             flags => $flags,
304             selector => $docs->[0],
305             update => $docs->[1],
306             };
307              
308 1         3 return $res;
309             }
310              
311             sub _decode_insert {
312 1     1   3 my ($self, $data_ref, $options) = @_;
313 1         2 my $data = $$data_ref;
314              
315 1         4 my $flags = _decode_flags(substr($data, 0, 4, ''), 'insert');
316 1         2 my $coll;
317 1         2 ($data, $coll) = _decode_cstring($data);
318 1         4 my $docs = _decode_documents(\$data, $options);
319              
320 1         10 my $res = {
321             flags => $flags,
322             fullCollectionName => $coll,
323             documents => $docs,
324             };
325              
326 1         2 return $res;
327             }
328              
329             sub _decode_query {
330 1     1   3 my ($self, $data_ref, $options) = @_;
331 1         2 my $data = $$data_ref;
332              
333 1         3 my $flags = _decode_flags(substr($data, 0, 4, ''), 'query');
334              
335 1         2 my $coll;
336 1         4 ($data, $coll) = _decode_cstring($data);
337              
338 1         4 my $numberToSkip = _decode_int32(substr($data, 0, 4, ''));
339 1         3 my $numberToReturn = _decode_int32(substr($data, 0, 4, ''));
340              
341 1         2 my $docs = _decode_documents(\$data, $options);
342 1 50       7 if (scalar(@$docs) !~ /^1|2$/) {
343 0         0 croak "query message should contains only 1 or 2 docs";
344             }
345              
346 1         5 my $res = {
347             flags => $flags,
348             fullCollectionName => $coll,
349             numberToSkip => $numberToSkip,
350             numberToReturn => $numberToReturn,
351             query => $docs->[0],
352             };
353 1 50       13 if ($docs->[1]) {
354 1         2 $res->{returnFieldSelector} = $docs->[1];
355             };
356              
357 1         3 return $res;
358             }
359              
360             sub _decode_getmore {
361 1     1   2 my ($self, $data_ref, $options) = @_;
362 1         3 my $data = $$data_ref;
363              
364 1         2 my $zero = _decode_int32(substr($data, 0, 4, ''));
365 1 50       4 if ($zero != 0) {
366 0         0 croak("can't parse 'getmore' message: no zero int32");
367             }
368              
369 1         1 my $coll;
370 1         3 ($data, $coll) = _decode_cstring($data);
371              
372 1         5 my $res = {
373             fullCollectionName => $coll,
374             numberToReturn => _decode_int32(substr($data, 0, 4, '')),
375             cursorID => _decode_int64(substr($data, 0, 8, '')),
376             };
377              
378 1         2 return $res;
379             }
380              
381             sub _decode_delete {
382 1     1   3 my ($self, $data_ref, $options) = @_;
383 1         2 my $data = $$data_ref;
384              
385 1         4 my $zero = _decode_int32(substr($data, 0, 4, ''));
386 1 50       4 if ($zero != 0) {
387 0         0 croak("can't parse 'delete' message: no zero int32");
388             }
389              
390 1         2 my $coll;
391 1         3 ($data, $coll) = _decode_cstring($data);
392              
393 1         5 my $flags = _decode_flags(substr($data, 0, 4, ''), 'delete');
394              
395 1         4 my $docs = _decode_documents(\$data, $options);
396 1 50       39 if (scalar(@$docs) != 1) {
397 0         0 croak "delete message should contains only 1 doc";
398             }
399              
400 1         5 my $res = {
401             fullCollectionName => $coll,
402             flags => $flags,
403             selector => $docs->[0],
404             };
405              
406 1         5 return $res;
407             }
408              
409             sub _decode_kill_cursors {
410 1     1   2 my ($self, $data_ref, $options) = @_;
411 1         2 my $data = $$data_ref;
412              
413 1         3 my $zero = _decode_int32(substr($data, 0, 4, ''));
414 1 50       4 if ($zero != 0) {
415 0         0 croak("can't parse 'kill_cursors' message: no zero int32");
416             }
417              
418 1         3 my $n_cursors = _decode_int32(substr($data, 0, 4, ''));
419 1 50       5 if (length($data) != $n_cursors * 8) {
420 0         0 croak("real number of cursors != number of cursors in the message");
421             }
422              
423 1         1 my @cursors;
424 1         3 while ($data) {
425 2         4 push @cursors, _decode_int64(substr($data, 0, 8, ''));
426             }
427              
428 1         3 my $res = {
429             numberOfCursorIDs => $n_cursors,
430             cursorIDs => \@cursors,
431             };
432              
433 1         2 return $res;
434             }
435              
436             sub _decode_msg {
437 1     1   2 my ($self, $data_ref, $options) = @_;
438 1         2 my $data = $$data_ref;
439              
440 1         1 my $msg;
441 1         3 ($data, $msg) = _decode_cstring($data);
442              
443 1 50       7 if (length($data) > 0) {
444 0         0 croak("can't parse 'msg' message: there are additional bytes at the end");
445             }
446              
447 1         2 return { message => $msg };
448             }
449              
450             sub _decode_reply {
451 1     1   3 my ($self, $data_ref, $options) = @_;
452 1         26 my $data = $$data_ref;
453              
454 1         6 my $res = {
455             responseFlags => _decode_flags(substr($data, 0, 4, ''), 'reply'),
456             cursorID => _decode_int64(substr($data, 0, 8, '')),
457             startingFrom => _decode_int32(substr($data, 0, 4, '')),
458             numberReturned => _decode_int32(substr($data, 0, 4, '')),
459             documents => _decode_documents(\$data, $options),
460             };
461              
462 1         5 return $res;
463             }
464              
465             sub _decode_header {
466 8     8   9 my $h = shift;
467              
468 8         12 my $messageLength = substr($h, 0, 4, '');
469 8         10 my $requestID = substr($h, 0, 4, '');
470 8         9 my $responseTo = substr($h, 0, 4, '');
471 8         12 my $opCode = substr($h, 0, 4, '');
472              
473 8         13 my $op_code_int = _decode_int32($opCode);
474 8         18 my $op_code_str = $OP_CODE2STR->{$op_code_int};
475 8 50       21 croak("Unknown op_code [$op_code_int]") unless defined($op_code_str);
476              
477 8         14 my $header = {
478             messageLength => _decode_int32($messageLength),
479             requestID => _decode_int32($requestID),
480             responseTo => _decode_int32($responseTo),
481             opCode => $op_code_str,
482             };
483              
484 8         17 return $header;
485             }
486              
487             sub _decode_int32 {
488 64     64   74 my $int32 = shift;
489 64         309 return unpack("l<", $int32);
490             }
491              
492             sub _decode_int64 {
493 4     4   3 my $int64 = shift;
494 4         16 return unpack("q<", $int64);
495             }
496              
497             sub _decode_flags {
498 5     5   7 my ($flags, $op_code_str) = @_;
499              
500 5         23 my $v = Bit::Vector->new(32);
501 5         9 $v->from_Dec(_decode_int32($flags));
502              
503 5         16 my $str_flags = {};
504 5         9 my $all_op_code_bits = $BIT2FLAG->{$op_code_str};
505 5         28 foreach my $reply_flag_bit (keys %$all_op_code_bits) {
506 15 100       191 if ($v->bit_test($reply_flag_bit)) {
507 4         7 my $reply_flag = $all_op_code_bits->{$reply_flag_bit};
508 4         11 $str_flags->{$reply_flag} = 1;
509             }
510             }
511              
512 5         27 return $str_flags;
513             }
514              
515             sub _decode_documents {
516 5     5   6 my ($data_ref, $options) = @_;
517 5         5 my %bson_options;
518 5 50 33     15 if ($options && $options->{ixhash}) {
519 0         0 %bson_options = (ixhash => 1);
520             }
521              
522 5         6 my $data = $$data_ref;
523              
524 5         6 my @docs = ();
525 5         11 while (length($data)) {
526 9         105 my $l = _decode_int32(substr($data, 0, 4));
527 9 50       22 if (length($data) < $l) {
528 0         0 croak "Incorrect length of bson document";
529             }
530 9         12 my $doc_str = substr($data, 0, $l, '');
531 9         27 push @docs, BSON::decode($doc_str, %bson_options);
532             }
533              
534 5         132 return \@docs;
535             }
536              
537             sub _decode_cstring {
538 6     6   9 my $data = shift;
539 6         9 my $idx = index($data, "\x00");
540 6 50       13 if ($idx < 0) {
541 0         0 croak("Can't find string terminator");
542             }
543              
544 6         8 my $string = substr($data, 0, $idx);
545 6         8 substr($data, 0, $idx + 1, '');
546              
547 6         16 return ($data, $string);
548             }
549              
550             sub _cstring {
551 6     6   9 my $string = shift;
552 6         34 return $string . "\x00";
553             }
554              
555             sub _int32 {
556 47     47   53 my $int32 = shift;
557 47         208 return pack("l<", $int32);
558             }
559              
560             sub _int64 {
561 4     4   4 my $int64 = shift;
562 4         34 return pack("q<", $int64);
563             }
564              
565             sub _flags {
566 5     5   7 my ($flags, $op_code_str) = @_;
567 5         34 my $v = Bit::Vector->new(32);
568              
569 5         11 my $all_op_code_flags = $FLAG2BIT->{$op_code_str};
570 5         22 while (my ($flag_name, $flag_value) = each %$flags) {
571 4 50 33     25 if (defined(my $flag_bit = $all_op_code_flags->{$flag_name}) && $flag_value) {
572 4         32 $v->Bit_On($flag_bit);
573             }
574             }
575              
576 5         40 return _int32($v->to_Dec);
577             }
578              
579             sub _documents {
580 7     7   9 my $d = shift;
581 7 50       15 return '' unless ($d);
582              
583 7         7 my @docs;
584 7 100       17 if (ref($d) eq 'HASH') {
    50          
585 5         7 @docs = ($d);
586             } elsif (ref($d) eq 'ARRAY') {
587 2         5 @docs = @$d;
588             }
589              
590 7         9 my $msg = '';
591 7         9 foreach my $doc (@docs) {
592 9         146 $msg .= BSON::encode($doc);
593             }
594              
595 7         271 return $msg;
596             }
597              
598             sub _with_header {
599 8     8   13 my ($self, $header, $msg_ref, $op) = @_;
600              
601 8         11 my $msg = $$msg_ref;
602 8         12 my $length = 4*4 + length($msg);
603              
604 8   100     11 my $h = _int32($length) . _int32($header->{requestID} || 0) .
      100        
605             _int32($header->{responseTo} || 0) . _int32($OP_CODES->{$op});
606              
607 8         54 return $h . $msg;
608             }
609              
610              
611             1;
612              
613             __END__