File Coverage

blib/lib/Mango/Protocol.pm
Criterion Covered Total %
statement 64 64 100.0
branch 34 36 94.4
condition n/a
subroutine 12 12 100.0
pod 8 8 100.0
total 118 120 98.3


line stmt bran cond sub pod time code
1             package Mango::Protocol;
2 10     10   198735 use Mojo::Base -base;
  10         31  
  10         95  
3              
4 10     10   2059 use Mango::BSON qw(bson_decode bson_encode bson_length encode_cstring);
  10         22  
  10         680  
5              
6             # Opcodes
7 10         10737 use constant {REPLY => 1, QUERY => 2004, GET_MORE => 2005,
8 10     10   64 KILL_CURSORS => 2007};
  10         18  
9              
10             sub build_get_more {
11 1     1 1 4 my ($self, $id, $name, $return, $cursor) = @_;
12              
13             # Zero and name
14 1         4 my $msg = pack('l<', 0) . encode_cstring($name);
15              
16             # Number to return and cursor id
17 1         6 $msg .= pack('l<', $return) . pack('q<', $cursor);
18              
19             # Header
20 1         3 return _build_header($id, length($msg), GET_MORE) . $msg;
21             }
22              
23             sub build_kill_cursors {
24 1     1 1 4 my ($self, $id) = (shift, shift);
25              
26             # Zero and number of cursor ids
27 1         5 my $msg = pack('l<', 0) . pack('l<', scalar @_);
28              
29             # Cursor ids
30 1         4 $msg .= pack 'q<', $_ for @_;
31              
32             # Header
33 1         4 return _build_header($id, length($msg), KILL_CURSORS) . $msg;
34             }
35              
36             sub build_query {
37 3     3 1 11 my ($self, $id, $name, $flags, $skip, $return, $query, $fields) = @_;
38              
39             # Flags
40 3         7 my $vec = pack 'B*', '0' x 32;
41 3 100       13 vec($vec, 1, 1) = 1 if $flags->{tailable_cursor};
42 3 100       10 vec($vec, 2, 1) = 1 if $flags->{slave_ok};
43 3 100       11 vec($vec, 4, 1) = 1 if $flags->{no_cursor_timeout};
44 3 100       9 vec($vec, 5, 1) = 1 if $flags->{await_data};
45 3 100       10 vec($vec, 6, 1) = 1 if $flags->{exhaust};
46 3 100       8 vec($vec, 7, 1) = 1 if $flags->{partial};
47 3         21 my $msg = pack 'l<', unpack('V', $vec);
48              
49             # Name
50 3         11 $msg .= encode_cstring $name;
51              
52             # Skip and number to return
53 3         9 $msg .= pack('l<', $skip) . pack('l<', $return);
54              
55             # Query
56 3         8 $msg .= bson_encode $query;
57              
58             # Optional field selector
59 3 50       10 $msg .= bson_encode $fields if $fields;
60              
61             # Header
62 3         11 return _build_header($id, length($msg), QUERY) . $msg;
63             }
64              
65             sub command_error {
66 2     2 1 5 my ($self, $doc) = @_;
67 2 100       11 return $doc->{ok} ? undef : $doc->{errmsg};
68             }
69              
70 3 100   3 1 115 sub next_id { $_[1] > 2147483646 ? 1 : $_[1] + 1 }
71              
72             sub parse_reply {
73 5     5 1 906 my ($self, $bufref) = @_;
74              
75             # Make sure we have the whole message
76 5 100       12 return undef unless my $len = bson_length $$bufref;
77 4 100       14 return undef if length $$bufref < $len;
78 3         7 my $msg = substr $$bufref, 0, $len, '';
79 3         7 substr $msg, 0, 4, '';
80              
81             # Header
82 3         8 my $id = unpack 'l<', substr($msg, 0, 4, '');
83 3         6 my $to = unpack 'l<', substr($msg, 0, 4, '');
84 3         7 my $op = unpack 'l<', substr($msg, 0, 4, '');
85 3 100       10 return undef unless $op == REPLY;
86              
87             # Flags
88 2         4 my $flags = {};
89 2         4 my $vec = substr $msg, 0, 4, '';
90 2 50       7 $flags->{cursor_not_found} = 1 if vec $vec, 0, 1;
91 2 100       6 $flags->{query_failure} = 1 if vec $vec, 1, 1;
92 2 100       6 $flags->{await_capable} = 1 if vec $vec, 3, 1;
93              
94             # Cursor id
95 2         6 my $cursor = unpack 'q<', substr($msg, 0, 8, '');
96              
97             # Starting from
98 2         5 my $from = unpack 'l<', substr($msg, 0, 4, '');
99              
100             # Documents (remove number of documents prefix)
101 2         4 substr $msg, 0, 4, '';
102 2         3 my @docs;
103 2         7 push @docs, bson_decode(substr $msg, 0, bson_length($msg), '') while $msg;
104              
105             return {
106 2         21 id => $id,
107             to => $to,
108             flags => $flags,
109             cursor => $cursor,
110             from => $from,
111             docs => \@docs
112             };
113             }
114              
115             sub query_failure {
116 3     3 1 10 my ($self, $reply) = @_;
117 3 100       12 return undef unless $reply;
118 2 100       13 return $reply->{flags}{query_failure} ? $reply->{docs}[0]{'$err'} : undef;
119             }
120              
121             sub write_error {
122 2     2 1 5 my ($self, $doc) = @_;
123 2 100       10 return undef unless my $errors = $doc->{writeErrors};
124             return join "\n",
125 1         3 map {"Write error at index $_->{index}: $_->{errmsg}"} @$errors;
  1         17  
126             }
127              
128             sub _build_header {
129 5     5   11 my ($id, $length, $op) = @_;
130 5         10 return join '', map { pack 'l<', $_ } $length + 16, $id, 0, $op;
  20         76  
131             }
132              
133             1;
134              
135             =encoding utf8
136              
137             =head1 NAME
138              
139             Mango::Protocol - The MongoDB wire protocol
140              
141             =head1 SYNOPSIS
142              
143             use Mango::Protocol;
144              
145             my $protocol = Mango::Protocol->new;
146             my $bytes = $protocol->query(1, 'foo', {}, 0, 10, {}, {});
147              
148             =head1 DESCRIPTION
149              
150             L is a minimalistic implementation of the MongoDB wire
151             protocol.
152              
153             =head1 METHODS
154              
155             L inherits all methods from L and implements the
156             following new ones.
157              
158             =head2 build_get_more
159              
160             my $bytes = $protocol->build_get_more($id, $name, $return, $cursor);
161              
162             Build message for C operation.
163              
164             =head2 build_kill_cursors
165              
166             my $bytes = $protocol->build_kill_cursors($id, @ids);
167              
168             Build message for C operation.
169              
170             =head2 build_query
171              
172             my $bytes = $protocol->build_query($id, $name, $flags, $skip, $return,
173             $query, $fields);
174              
175             Build message for C operation.
176              
177             =head2 command_error
178              
179             my $err = $protocol->command_error($doc);
180              
181             Check document for command error.
182              
183             =head2 next_id
184              
185             my $id = $protocol->next_id(23);
186              
187             Generate next id.
188              
189             =head2 parse_reply
190              
191             my $reply = $protocol->parse_reply(\$str);
192              
193             Extract and parse C message.
194              
195             =head2 query_failure
196              
197             my $err = $protocol->query_failure($reply);
198              
199             Check reply for query failure.
200              
201             =head2 write_error
202              
203             my $err = $protocol->write_error($doc);
204              
205             Check document for write error.
206              
207             =head1 SEE ALSO
208              
209             L, L, L.
210              
211             =cut