File Coverage

blib/lib/Mango.pm
Criterion Covered Total %
statement 36 209 17.2
branch 0 94 0.0
condition 1 71 1.4
subroutine 12 40 30.0
pod 7 7 100.0
total 56 421 13.3


line stmt bran cond sub pod time code
1             package Mango;
2 9     9   184912 use Mojo::Base 'Mojo::EventEmitter';
  9         14  
  9         45  
3              
4 9     9   11249 use Carp 'croak';
  9         11  
  9         315  
5 9     9   4307 use Hash::Util::FieldHash;
  9         5918  
  9         372  
6 9     9   3054 use Mango::BSON 'bson_doc';
  9         27  
  9         526  
7 9     9   3651 use Mango::Database;
  9         23  
  9         59  
8 9     9   3276 use Mango::Protocol;
  9         15  
  9         49  
9 9     9   238 use Mojo::IOLoop;
  9         9  
  9         39  
10 9     9   4025 use Mojo::URL;
  9         48133  
  9         57  
11 9     9   256 use Mojo::Util 'dumper';
  9         11  
  9         342  
12 9     9   35 use Scalar::Util 'weaken';
  9         11  
  9         437  
13              
14 9   50 9   35 use constant DEBUG => $ENV{MANGO_DEBUG} || 0;
  9         11  
  9         492  
15 9     9   32 use constant DEFAULT_PORT => 27017;
  9         10  
  9         22426  
16              
17             has default_db => 'admin';
18             has hosts => sub { [['localhost']] };
19             has [qw(inactivity_timeout j)] => 0;
20             has ioloop => sub { Mojo::IOLoop->new };
21             has max_bson_size => 16777216;
22             has max_connections => 5;
23             has [qw(max_write_batch_size wtimeout)] => 1000;
24             has protocol => sub { Mango::Protocol->new };
25             has w => 1;
26              
27             # Private variables are not visible in the object's dump. This
28             # is good for security.
29             Hash::Util::FieldHash::fieldhash my %AUTH;
30              
31             our $VERSION = '1.28';
32              
33 0     0     sub DESTROY { shift->_cleanup }
34              
35 0 0   0 1   sub backlog { scalar @{shift->{queue} || []} }
  0            
36              
37             sub db {
38 0     0 1   my ($self, $name) = @_;
39 0   0       $name //= $self->default_db;
40 0           my $db = Mango::Database->new(mango => $self, name => $name);
41 0           weaken $db->{mango};
42 0           return $db;
43             }
44              
45             sub from_string {
46 0     0 1   my ($self, $str) = @_;
47              
48             # Protocol
49 0 0         return $self unless $str;
50 0           my $url = Mojo::URL->new($str);
51 0 0         croak qq{Invalid MongoDB connection string "$str"}
52             unless $url->protocol eq 'mongodb';
53              
54             # Hosts
55 0           my @hosts;
56             /^([^,:]+)(?::(\d+))?/ and push @hosts, $2 ? [$1, $2] : [$1]
57 0 0 0       for split /,/, join(':', map { $_ // '' } $url->host, $url->port);
  0   0        
58 0 0         $self->hosts(\@hosts) if @hosts;
59              
60             # Database
61 0 0         if (my $db = $url->path->parts->[0]) { $self->default_db($db) }
  0            
62              
63             # User and password
64 0 0 0       if (($url->userinfo // '') =~ /^([^:]+):([^:]+)$/) {
65 0           require Mango::Auth::SCRAM;
66 0           $self->_auth(Mango::Auth::SCRAM->new)
67             ->_auth->_credentials([$self->default_db, $1, $2]);
68             }
69              
70             # Options
71 0           my $query = $url->query;
72 0 0         if (my $j = $query->param('journal')) { $self->j($j) }
  0            
73 0 0         if (my $w = $query->param('w')) { $self->w($w) }
  0            
74 0 0         if (my $timeout = $query->param('wtimeoutMS')) { $self->wtimeout($timeout) }
  0            
75              
76 0           return $self;
77             }
78              
79 0     0 1   sub get_more { shift->_op('get_more', 1, @_) }
80              
81 0     0 1   sub kill_cursors { shift->_op('kill_cursors', 0, @_) }
82              
83 0     0 1   sub new { shift->SUPER::new->from_string(@_) }
84              
85 0     0 1   sub query { shift->_op('query', 1, @_) }
86              
87             sub _auth {
88 0     0     my ($self, $mode) = @_;
89 0 0         return $AUTH{$self} unless $mode;
90              
91 0           $AUTH{$self} = $mode;
92 0           $AUTH{$self}->mango($self);
93 0           weaken $AUTH{$self}->{mango};
94 0           return $self;
95             }
96              
97             sub _build {
98 0     0     my ($self, $name) = (shift, shift);
99 0           my $next = $self->_id;
100 0           warn "-- Operation #$next ($name)\n@{[dumper [@_]]}" if DEBUG;
101 0           my $method = "build_$name";
102 0           return ($next, $self->protocol->$method($next, @_));
103             }
104              
105             sub _cleanup {
106 0     0     my $self = shift;
107 0 0         return unless $self->_loop(0);
108              
109             # Clean up connections
110 0           delete $self->{pid};
111 0           my $connections = delete $self->{connections};
112 0           for my $c (keys %$connections) {
113 0           my $loop = $self->_loop($connections->{$c}{nb});
114 0 0         $loop->remove($c) if $loop;
115             }
116              
117             # Clean up active operations
118 0   0       my $queue = delete $self->{queue} || [];
119             $_->{last} && !$_->{start} && unshift @$queue, $_->{last}
120 0   0       for values %$connections;
      0        
121 0           $self->_finish(undef, $_->{cb}, 'Premature connection close') for @$queue;
122             }
123              
124             sub _close {
125 0     0     my ($self, $id) = @_;
126              
127 0 0         return unless my $c = delete $self->{connections}{$id};
128 0           my $last = $c->{last};
129 0 0         $self->_finish(undef, $last->{cb}, 'Premature connection close') if $last;
130 0 0         $self->_connect($c->{nb}) if @{$self->{queue}};
  0            
131             }
132              
133             sub _connect {
134 0     0     my ($self, $nb, $hosts) = @_;
135              
136 0   0       my ($host, $port) = @{shift @{$hosts ||= [@{$self->hosts}]}};
  0            
  0            
  0            
137 0           weaken $self;
138 0           my $id;
139             $id = $self->_loop($nb)->client(
140             {address => $host, port => $port //= DEFAULT_PORT} => sub {
141 0     0     my ($loop, $err, $stream) = @_;
142              
143             # Connection error (try next server)
144 0 0         if ($err) {
145 0 0         return $self->_error($id, $err) unless @$hosts;
146 0           delete $self->{connections}{$id};
147 0           return $self->_connect($nb, $hosts);
148             }
149              
150             # Connection established
151 0           $stream->timeout($self->inactivity_timeout);
152 0 0         $stream->on(close => sub { $self && $self->_close($id) });
  0            
153 0 0         $stream->on(error => sub { $self && $self->_error($id, pop) });
  0            
154 0           $stream->on(read => sub { $self->_read($id, pop) });
  0            
155              
156             # Check node information with "isMaster" command
157 0           my $cb = sub { shift->_master($id, $nb, $hosts, pop) };
  0            
158 0           $self->_fast($id, $self->default_db, {isMaster => 1}, $cb);
159             }
160 0   0       );
161 0           $self->{connections}{$id} = { nb => $nb, start => 1 };
162              
163 0           my $num = scalar keys %{$self->{connections}};
  0            
164 0           warn "-- New connection ($host:$port:$num)\n" if DEBUG;
165             }
166              
167             sub _error {
168 0     0     my ($self, $id, $err) = @_;
169              
170 0 0         return unless my $c = delete $self->{connections}{$id};
171 0           $self->_loop($c->{nb})->remove($id);
172              
173 0   0       my $last = $c->{last} // shift @{$self->{queue}};
  0            
174 0 0         $self->_finish(undef, $last->{cb}, $err) if $last;
175             }
176              
177             sub _fast {
178 0     0     my ($self, $id, $db, $command, $cb) = @_;
179              
180             # Handle errors
181             my $wrapper = sub {
182 0     0     my ($self, $err, $reply) = @_;
183              
184 0           my $doc = $reply->{docs}[0];
185 0   0       $err ||= $self->protocol->command_error($doc);
186 0 0         return $self->$cb(undef, $doc) unless $err;
187              
188 0 0         return unless my $last = shift @{$self->{queue}};
  0            
189 0           $self->_finish(undef, $last->{cb}, $err);
190 0           };
191              
192             # Skip the queue and run command right away
193 0           my ($next, $msg)
194             = $self->_build('query', "$db.\$cmd", {}, 0, -1, $command, {});
195             $self->{connections}{$id}{fast}
196 0           = {id => $next, safe => 1, msg => $msg, cb => $wrapper};
197 0           $self->_next;
198             }
199              
200             sub _finish {
201 0     0     my ($self, $reply, $cb, $err) = @_;
202 0   0       $self->$cb($err || $self->protocol->query_failure($reply), $reply);
203             }
204              
205 0   0 0     sub _id { $_[0]{id} = $_[0]->protocol->next_id($_[0]{id} // 0) }
206              
207 0 0   0     sub _loop { $_[1] ? Mojo::IOLoop->singleton : $_[0]->ioloop }
208              
209             sub _master {
210 0     0     my ($self, $id, $nb, $hosts, $doc) = @_;
211              
212             # Check version
213             return $self->_error($id, 'MongoDB version 3.0 required')
214 0 0 0       unless ($doc->{maxWireVersion} || 0) >= 3;
215              
216             # Continue with authentication if we are connected to the primary
217 0 0         if ($doc->{ismaster}) {
218 0 0         return $self->_auth
219             ? $self->_auth->_authenticate($id)
220             : $self->emit(connection => $id)->_next;
221             }
222              
223             # Get primary and try to connect again
224 0 0 0       unshift @$hosts, [$1, $2] if ($doc->{primary} // '') =~ /^(.+):(\d+)$/;
225 0 0         return $self->_error($id, "Couldn't find primary node") unless @$hosts;
226 0           delete $self->{connections}{$id};
227 0           $self->_loop($nb)->remove($id);
228 0           $self->_connect($nb, $hosts);
229             }
230              
231             sub _next {
232 0     0     my ($self, $op) = @_;
233              
234             # Make sure all connections are saturated
235 0 0 0       push @{$self->{queue} ||= []}, $op if $op;
  0            
236 0           my $connections = $self->{connections};
237 0           my $start;
238 0   0       $self->_write($_) and $start++ for keys %$connections;
239              
240             # Check if we need a blocking connection
241 0 0         return unless $op;
242 0           my @ids = keys %$connections;
243             return $self->_connect(0)
244 0 0 0       if !$op->{nb} && !grep { !$connections->{$_}{nb} } @ids;
  0            
245              
246             # Check if we need more non-blocking connections
247             $self->_connect(1)
248 0 0 0       if !$start && @{$self->{queue}} && @ids < $self->max_connections;
  0   0        
249             }
250              
251             sub _op {
252 0     0     my ($self, $op, $safe) = (shift, shift, shift);
253 0 0         my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
254 0           my ($next, $msg) = $self->_build($op, @_);
255 0           $self->_start(
256             {id => $next, safe => $safe, msg => $msg, nb => !!$cb, cb => $cb});
257             }
258              
259             sub _read {
260 0     0     my ($self, $id, $chunk) = @_;
261              
262 0           my $c = $self->{connections}{$id};
263 0           $c->{buffer} .= $chunk;
264 0           while (my $reply = $self->protocol->parse_reply(\$c->{buffer})) {
265 0           warn "-- Client <<< Server (#$reply->{to})\n@{[dumper $reply]}" if DEBUG;
266 0 0         next unless $reply->{to} == $c->{last}{id};
267 0           $self->_finish($reply, (delete $c->{last})->{cb});
268             }
269 0           $self->_next;
270             }
271              
272             sub _start {
273 0     0     my ($self, $op) = @_;
274              
275             # Fork safety
276 0 0 0       $self->_cleanup unless ($self->{pid} //= $$) eq $$;
277              
278             # Non-blocking
279 0 0         return $self->_next($op) if $op->{cb};
280              
281             # Blocking
282 0           my ($err, $reply);
283 0     0     $op->{cb} = sub { shift->ioloop->stop; ($err, $reply) = @_ };
  0            
  0            
284 0           $self->_next($op);
285 0           $self->ioloop->start;
286 0 0         return $err ? croak $err : $reply;
287             }
288              
289             sub _write {
290 0     0     my ($self, $id) = @_;
291              
292             # Make sure connection has not been corrupted while event loop was stopped
293 0           my $c = $self->{connections}{$id};
294 0 0         return $c->{start} if $c->{last};
295 0           my $loop = $self->_loop($c->{nb});
296 0 0         return undef unless my $stream = $loop->stream($id);
297 0 0 0       if (!$loop->is_running && $stream->is_readable) {
298 0           $stream->close;
299 0           return undef;
300             }
301              
302             # Fast operation
303 0 0         delete $c->{start} unless my $last = delete $c->{fast};
304              
305             # Blocking operations have a higher precedence
306             return $c->{start}
307 0 0 0       unless $last || ($c->{nb} xor !($self->{queue}->[-1] || {})->{nb});
      0        
      0        
308 0 0 0       $last ||= $c->{nb} ? shift @{$self->{queue}} : pop @{$self->{queue}};
  0            
  0            
309              
310 0 0         return $c->{start} unless $c->{last} = $last;
311 0           warn "-- Client >>> Server (#$last->{id})\n" if DEBUG;
312 0           $stream->write(delete $last->{msg});
313              
314             # Unsafe operations are done when they are written
315 0 0         return $c->{start} if $last->{safe};
316 0           weaken $self;
317 0     0     $stream->write('', sub { $self->_finish(undef, delete($c->{last})->{cb}) });
  0            
318 0           return $c->{start};
319             }
320              
321             1;
322              
323             =encoding utf8
324              
325             =head1 NAME
326              
327             Mango - Pure-Perl non-blocking I/O MongoDB driver
328              
329             =head1 SYNOPSIS
330              
331             use Mango;
332              
333             # Insert document
334             my $mango = Mango->new('mongodb://localhost:27017');
335             my $oid = $mango->db('test')->collection('foo')->insert({bar => 'baz'});
336              
337             # Find document
338             my $doc = $mango->db('test')->collection('foo')->find_one({bar => 'baz'});
339             say $doc->{bar};
340              
341             # Update document
342             $mango->db('test')->collection('foo')
343             ->update({bar => 'baz'}, {bar => 'yada'});
344              
345             # Remove document
346             $mango->db('test')->collection('foo')->remove({bar => 'yada'});
347              
348             # Insert document with special BSON types
349             use Mango::BSON ':bson';
350             my $oid = $mango->db('test')->collection('foo')
351             ->insert({data => bson_bin("\x00\x01"), now => bson_time});
352              
353             # Non-blocking concurrent find
354             my $delay = Mojo::IOLoop->delay(sub {
355             my ($delay, @docs) = @_;
356             ...
357             });
358             for my $name (qw(sri marty)) {
359             my $end = $delay->begin(0);
360             $mango->db('test')->collection('users')->find({name => $name})->all(sub {
361             my ($cursor, $err, $docs) = @_;
362             $end->(@$docs);
363             });
364             }
365             $delay->wait;
366              
367             # Event loops such as AnyEvent are supported through EV
368             use EV;
369             use AnyEvent;
370             my $cv = AE::cv;
371             $mango->db('test')->command(buildInfo => sub {
372             my ($db, $err, $doc) = @_;
373             $cv->send($doc->{version});
374             });
375             say $cv->recv;
376              
377             =head1 DESCRIPTION
378              
379             L is a pure-Perl non-blocking I/O MongoDB driver, optimized for use
380             with the L real-time web framework, and with multiple event loop
381             support. Since MongoDB is still changing rapidly, only the latest stable
382             version is supported.
383              
384             For MongoDB 2.6 support, use L 1.16.
385              
386             To learn more about MongoDB you should take a look at the
387             L, the documentation included
388             in this distribution is no replacement for it.
389              
390             Look at L for CRUD operations.
391              
392             Many arguments passed to methods as well as values of attributes get
393             serialized to BSON with L, which provides many helper functions
394             you can use to generate data types that are not available natively in Perl.
395             All connections will be reset automatically if a new process has been forked,
396             this allows multiple processes to share the same L object safely.
397              
398             For better scalability (epoll, kqueue) and to provide IPv6, SOCKS5 as well as
399             TLS support, the optional modules L (4.0+), L (0.20+),
400             L (0.64+) and L (1.84+) will be used
401             automatically if they are installed. Individual features can also be disabled
402             with the C, C and C environment
403             variables.
404              
405             =head1 EVENTS
406              
407             L inherits all events from L and can emit the
408             following new ones.
409              
410             =head2 connection
411              
412             $mango->on(connection => sub {
413             my ($mango, $id) = @_;
414             ...
415             });
416              
417             Emitted when a new connection has been established.
418              
419             =head1 ATTRIBUTES
420              
421             L implements the following attributes.
422              
423             =head2 default_db
424              
425             my $name = $mango->default_db;
426             $mango = $mango->default_db('test');
427              
428             Default database, defaults to C.
429              
430             =head2 hosts
431              
432             my $hosts = $mango->hosts;
433             $mango = $mango->hosts([['localhost', 3000], ['localhost', 4000]]);
434              
435             Servers to connect to, defaults to C and port C<27017>.
436              
437             =head2 inactivity_timeout
438              
439             my $timeout = $mango->inactivity_timeout;
440             $mango = $mango->inactivity_timeout(15);
441              
442             Maximum amount of time in seconds a connection can be inactive before getting
443             closed, defaults to C<0>. Setting the value to C<0> will allow connections to
444             be inactive indefinitely.
445              
446             =head2 ioloop
447              
448             my $loop = $mango->ioloop;
449             $mango = $mango->ioloop(Mojo::IOLoop->new);
450              
451             Event loop object to use for blocking I/O operations, defaults to a
452             L object.
453              
454             =head2 j
455              
456             my $j = $mango->j;
457             $mango = $mango->j(1);
458              
459             Wait for all operations to have reached the journal, defaults to C<0>.
460              
461             =head2 max_bson_size
462              
463             my $max = $mango->max_bson_size;
464             $mango = $mango->max_bson_size(16777216);
465              
466             Maximum size for BSON documents in bytes, defaults to C<16777216> (16MB).
467              
468             =head2 max_connections
469              
470             my $max = $mango->max_connections;
471             $mango = $mango->max_connections(5);
472              
473             Maximum number of connections to use for non-blocking operations, defaults to
474             C<5>.
475              
476             =head2 max_write_batch_size
477              
478             my $max = $mango->max_write_batch_size;
479             $mango = $mango->max_write_batch_size(1000);
480              
481             Maximum number of write operations to batch together, defaults to C<1000>.
482              
483             =head2 protocol
484              
485             my $protocol = $mango->protocol;
486             $mango = $mango->protocol(Mango::Protocol->new);
487              
488             Protocol handler, defaults to a L object.
489              
490             =head2 w
491              
492             my $w = $mango->w;
493             $mango = $mango->w(2);
494              
495             Wait for all operations to have reached at least this many servers, C<1>
496             indicates just primary, C<2> indicates primary and at least one secondary,
497             defaults to C<1>.
498              
499             =head2 wtimeout
500              
501             my $timeout = $mango->wtimeout;
502             $mango = $mango->wtimeout(1);
503              
504             Timeout for write propagation in milliseconds, defaults to C<1000>.
505              
506             =head1 METHODS
507              
508             L inherits all methods from L and implements the following
509             new ones.
510              
511             =head2 backlog
512              
513             my $num = $mango->backlog;
514              
515             Number of queued operations that have not yet been assigned to a connection.
516              
517             =head2 db
518              
519             my $db = $mango->db;
520             my $db = $mango->db('test');
521              
522             Build L object for database, uses L if no name
523             is provided. Note that the reference L is weakened,
524             so the L object needs to be referenced elsewhere as well.
525              
526             =head2 from_string
527              
528             $mango
529             = $mango->from_string('mongodb://sri:s3cret@localhost:3000/test?w=2');
530              
531             Parse configuration from connection string.
532              
533             =head2 get_more
534              
535             my $reply = $mango->get_more($namespace, $return, $cursor);
536              
537             Perform low level C operation. You can also append a callback to
538             perform operation non-blocking.
539              
540             $mango->get_more(($namespace, $return, $cursor) => sub {
541             my ($mango, $err, $reply) = @_;
542             ...
543             });
544             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
545              
546             =head2 kill_cursors
547              
548             $mango->kill_cursors(@ids);
549              
550             Perform low level C operation. You can also append a callback to
551             perform operation non-blocking.
552              
553             $mango->kill_cursors(@ids => sub {
554             my ($mango, $err) = @_;
555             ...
556             });
557             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
558              
559             =head2 new
560              
561             my $mango = Mango->new;
562             my $mango = Mango->new('mongodb://sri:s3cret@localhost:3000/test?w=2');
563              
564             Construct a new L object and parse connection string with
565             L if necessary.
566              
567             If a username and password are provided, Mango will try to authenticate using
568             SCRAM-SHA1. B this will require L which is not
569             installed by default.
570              
571             =head2 query
572              
573             my $reply
574             = $mango->query($namespace, $flags, $skip, $return, $query, $fields);
575              
576             Perform low level C operation. You can also append a callback to
577             perform operation non-blocking.
578              
579             $mango->query(($namespace, $flags, $skip, $return, $query, $fields) => sub {
580             my ($mango, $err, $reply) = @_;
581             ...
582             });
583             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
584              
585             =head1 DEBUGGING
586              
587             You can set the C environment variable to get some advanced
588             diagnostics information printed to C.
589              
590             MANGO_DEBUG=1
591              
592             =head1 SPONSORS
593              
594             Some of the work on this distribution has been sponsored by
595             L, thank you!
596              
597             =head1 AUTHOR
598              
599             Sebastian Riedel, C.
600              
601             Current maintainer: Olivier Duclos C.
602              
603             =head1 CREDITS
604              
605             In alphabetical order:
606              
607             =over 2
608              
609             alexbyk
610              
611             Andrey Khozov
612              
613             Colin Cyr
614              
615             =back
616              
617             =head1 COPYRIGHT AND LICENSE
618              
619             Copyright (C) 2013-2014, Sebastian Riedel.
620              
621             This program is free software, you can redistribute it and/or modify it under
622             the terms of the Artistic License version 2.0.
623              
624             =head1 SEE ALSO
625              
626             L, L,
627             L.
628              
629             =cut