File Coverage

blib/lib/Mango.pm
Criterion Covered Total %
statement 36 209 17.2
branch 0 94 0.0
condition 1 71 1.4
subroutine 12 40 30.0
pod 7 7 100.0
total 56 421 13.3


line stmt bran cond sub pod time code
1             package Mango;
2 9     9   182327 use Mojo::Base 'Mojo::EventEmitter';
  9         14  
  9         43  
3              
4 9     9   11583 use Carp 'croak';
  9         12  
  9         321  
5 9     9   4342 use Hash::Util::FieldHash;
  9         5664  
  9         392  
6 9     9   3124 use Mango::BSON 'bson_doc';
  9         21  
  9         517  
7 9     9   3250 use Mango::Database;
  9         23  
  9         60  
8 9     9   3196 use Mango::Protocol;
  9         19  
  9         53  
9 9     9   210 use Mojo::IOLoop;
  9         13  
  9         39  
10 9     9   3770 use Mojo::URL;
  9         45841  
  9         53  
11 9     9   249 use Mojo::Util 'dumper';
  9         10  
  9         391  
12 9     9   34 use Scalar::Util 'weaken';
  9         11  
  9         378  
13              
14 9   50 9   28 use constant DEBUG => $ENV{MANGO_DEBUG} || 0;
  9         10  
  9         494  
15 9     9   36 use constant DEFAULT_PORT => 27017;
  9         9  
  9         21069  
16              
17             has default_db => 'admin';
18             has hosts => sub { [['localhost']] };
19             has [qw(inactivity_timeout j)] => 0;
20             has ioloop => sub { Mojo::IOLoop->new };
21             has max_bson_size => 16777216;
22             has max_connections => 5;
23             has [qw(max_write_batch_size wtimeout)] => 1000;
24             has protocol => sub { Mango::Protocol->new };
25             has w => 1;
26              
27             # Private variables are not visible in the object's dump. This
28             # is good for security.
29             Hash::Util::FieldHash::fieldhash my %AUTH;
30              
31             our $VERSION = '1.29';
32              
33 0     0     sub DESTROY { shift->_cleanup }
34              
35 0 0   0 1   sub backlog { scalar @{shift->{queue} || []} }
  0            
36              
37             sub db {
38 0     0 1   my ($self, $name) = @_;
39 0   0       $name //= $self->default_db;
40 0           my $db = Mango::Database->new(mango => $self, name => $name);
41 0           weaken $db->{mango};
42 0           return $db;
43             }
44              
45             sub from_string {
46 0     0 1   my ($self, $str) = @_;
47              
48             # Protocol
49 0 0         return $self unless $str;
50 0           my $url = Mojo::URL->new($str);
51 0 0         croak qq{Invalid MongoDB connection string "$str"}
52             unless $url->protocol eq 'mongodb';
53              
54             # Hosts
55 0           my @hosts;
56             /^([^,:]+)(?::(\d+))?/ and push @hosts, $2 ? [$1, $2] : [$1]
57 0 0 0       for split /,/, join(':', map { $_ // '' } $url->host, $url->port);
  0   0        
58 0 0         $self->hosts(\@hosts) if @hosts;
59              
60             # Database
61 0 0         if (my $db = $url->path->parts->[0]) { $self->default_db($db) }
  0            
62              
63             # User and password
64 0 0 0       if (($url->userinfo // '') =~ /^([^:]+):([^:]+)$/) {
65 0           require Mango::Auth::SCRAM;
66 0           $self->_auth(Mango::Auth::SCRAM->new)
67             ->_auth->_credentials([$self->default_db, $1, $2]);
68             }
69              
70             # Options
71 0           my $query = $url->query;
72 0 0         if (my $j = $query->param('journal')) { $self->j($j) }
  0            
73 0 0         if (my $w = $query->param('w')) { $self->w($w) }
  0            
74 0 0         if (my $timeout = $query->param('wtimeoutMS')) { $self->wtimeout($timeout) }
  0            
75              
76 0           return $self;
77             }
78              
79 0     0 1   sub get_more { shift->_op('get_more', 1, @_) }
80              
81 0     0 1   sub kill_cursors { shift->_op('kill_cursors', 0, @_) }
82              
83 0     0 1   sub new { shift->SUPER::new->from_string(@_) }
84              
85 0     0 1   sub query { shift->_op('query', 1, @_) }
86              
87             sub _auth {
88 0     0     my ($self, $mode) = @_;
89 0 0         return $AUTH{$self} unless $mode;
90              
91 0           $AUTH{$self} = $mode;
92 0           $AUTH{$self}->mango($self);
93 0           weaken $AUTH{$self}->{mango};
94 0           return $self;
95             }
96              
97             sub _build {
98 0     0     my ($self, $name) = (shift, shift);
99 0           my $next = $self->_id;
100 0           warn "-- Operation #$next ($name)\n@{[dumper [@_]]}" if DEBUG;
101 0           my $method = "build_$name";
102 0           return ($next, $self->protocol->$method($next, @_));
103             }
104              
105             sub _cleanup {
106 0     0     my $self = shift;
107 0 0         return unless $self->_loop(0);
108              
109             # Clean up connections
110 0           delete $self->{pid};
111 0           my $connections = delete $self->{connections};
112 0           for my $c (keys %$connections) {
113 0           my $loop = $self->_loop($connections->{$c}{nb});
114 0 0         $loop->remove($c) if $loop;
115             }
116              
117             # Clean up active operations
118 0   0       my $queue = delete $self->{queue} || [];
119             $_->{last} && !$_->{start} && unshift @$queue, $_->{last}
120 0   0       for values %$connections;
      0        
121 0           $self->_finish(undef, $_->{cb}, 'Premature connection close') for @$queue;
122             }
123              
124             sub _close {
125 0     0     my ($self, $id) = @_;
126              
127 0 0         return unless my $c = delete $self->{connections}{$id};
128 0           my $last = $c->{last};
129 0 0         $self->_finish(undef, $last->{cb}, 'Premature connection close') if $last;
130 0 0         $self->_connect($c->{nb}) if @{$self->{queue}};
  0            
131             }
132              
133             sub _connect {
134 0     0     my ($self, $nb, $hosts) = @_;
135              
136 0   0       my ($host, $port) = @{shift @{$hosts ||= [@{$self->hosts}]}};
  0            
  0            
  0            
137 0           weaken $self;
138 0           my $id;
139             $id = $self->_loop($nb)->client(
140             {address => $host, port => $port //= DEFAULT_PORT} => sub {
141 0     0     my ($loop, $err, $stream) = @_;
142              
143             # Connection error (try next server)
144 0 0         if ($err) {
145 0 0         return $self->_error($id, $err) unless @$hosts;
146 0           delete $self->{connections}{$id};
147 0           return $self->_connect($nb, $hosts);
148             }
149              
150             # Connection established
151 0           $stream->timeout($self->inactivity_timeout);
152 0 0         $stream->on(close => sub { $self && $self->_close($id) });
  0            
153 0 0         $stream->on(error => sub { $self && $self->_error($id, pop) });
  0            
154 0           $stream->on(read => sub { $self->_read($id, pop) });
  0            
155              
156             # Check node information with "isMaster" command
157 0           my $cb = sub { shift->_master($id, $nb, $hosts, pop) };
  0            
158 0           $self->_fast($id, $self->default_db, {isMaster => 1}, $cb);
159             }
160 0   0       );
161 0           $self->{connections}{$id} = { nb => $nb, start => 1 };
162              
163 0           my $num = scalar keys %{$self->{connections}};
  0            
164 0           warn "-- New connection ($host:$port:$num)\n" if DEBUG;
165             }
166              
167             sub _error {
168 0     0     my ($self, $id, $err) = @_;
169              
170 0 0         return unless my $c = delete $self->{connections}{$id};
171 0           $self->_loop($c->{nb})->remove($id);
172              
173 0   0       my $last = $c->{last} // shift @{$self->{queue}};
  0            
174 0 0         $self->_finish(undef, $last->{cb}, $err) if $last;
175             }
176              
177             sub _fast {
178 0     0     my ($self, $id, $db, $command, $cb) = @_;
179              
180             # Handle errors
181             my $wrapper = sub {
182 0     0     my ($self, $err, $reply) = @_;
183              
184 0           my $doc = $reply->{docs}[0];
185 0   0       $err ||= $self->protocol->command_error($doc);
186 0 0         return $self->$cb(undef, $doc) unless $err;
187              
188 0 0         return unless my $last = shift @{$self->{queue}};
  0            
189 0           $self->_finish(undef, $last->{cb}, $err);
190 0           };
191              
192             # Skip the queue and run command right away
193 0           my ($next, $msg)
194             = $self->_build('query', "$db.\$cmd", {}, 0, -1, $command, {});
195             $self->{connections}{$id}{fast}
196 0           = {id => $next, safe => 1, msg => $msg, cb => $wrapper};
197 0           $self->_next;
198             }
199              
200             sub _finish {
201 0     0     my ($self, $reply, $cb, $err) = @_;
202 0   0       $self->$cb($err || $self->protocol->query_failure($reply), $reply);
203             }
204              
205 0   0 0     sub _id { $_[0]{id} = $_[0]->protocol->next_id($_[0]{id} // 0) }
206              
207 0 0   0     sub _loop { $_[1] ? Mojo::IOLoop->singleton : $_[0]->ioloop }
208              
209             sub _master {
210 0     0     my ($self, $id, $nb, $hosts, $doc) = @_;
211              
212             # Check version
213             return $self->_error($id, 'MongoDB version 3.0 required')
214 0 0 0       unless ($doc->{maxWireVersion} || 0) >= 3;
215              
216             # Continue with authentication if we are connected to the primary
217 0 0         if ($doc->{ismaster}) {
218 0 0         return $self->_auth
219             ? $self->_auth->_authenticate($id)
220             : $self->emit(connection => $id)->_next;
221             }
222              
223             # Get primary and try to connect again
224 0 0 0       unshift @$hosts, [$1, $2] if ($doc->{primary} // '') =~ /^(.+):(\d+)$/;
225 0 0         return $self->_error($id, "Couldn't find primary node") unless @$hosts;
226 0           delete $self->{connections}{$id};
227 0           $self->_loop($nb)->remove($id);
228 0           $self->_connect($nb, $hosts);
229             }
230              
231             sub _next {
232 0     0     my ($self, $op) = @_;
233              
234             # Make sure all connections are saturated
235 0 0 0       push @{$self->{queue} ||= []}, $op if $op;
  0            
236 0           my $connections = $self->{connections};
237 0           my $start;
238 0   0       $self->_write($_) and $start++ for keys %$connections;
239              
240             # Check if we need a blocking connection
241 0 0         return unless $op;
242 0           my @ids = keys %$connections;
243             return $self->_connect(0)
244 0 0 0       if !$op->{nb} && !grep { !$connections->{$_}{nb} } @ids;
  0            
245              
246             # Check if we need more non-blocking connections
247             $self->_connect(1)
248 0 0 0       if !$start && @{$self->{queue}} && @ids < $self->max_connections;
  0   0        
249             }
250              
251             sub _op {
252 0     0     my ($self, $op, $safe) = (shift, shift, shift);
253 0 0         my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
254 0           my ($next, $msg) = $self->_build($op, @_);
255 0           $self->_start(
256             {id => $next, safe => $safe, msg => $msg, nb => !!$cb, cb => $cb});
257             }
258              
259             sub _read {
260 0     0     my ($self, $id, $chunk) = @_;
261              
262 0           my $c = $self->{connections}{$id};
263 0           $c->{buffer} .= $chunk;
264 0           while (my $reply = $self->protocol->parse_reply(\$c->{buffer})) {
265 0           warn "-- Client <<< Server (#$reply->{to})\n@{[dumper $reply]}" if DEBUG;
266 0 0         next unless $reply->{to} == $c->{last}{id};
267 0           $self->_finish($reply, (delete $c->{last})->{cb});
268             }
269 0           $self->_next;
270             }
271              
272             sub _start {
273 0     0     my ($self, $op) = @_;
274              
275             # Fork safety
276 0 0 0       $self->_cleanup unless ($self->{pid} //= $$) eq $$;
277              
278             # Non-blocking
279 0 0         return $self->_next($op) if $op->{cb};
280              
281             # Blocking
282 0           my ($err, $reply);
283 0     0     $op->{cb} = sub { shift->ioloop->stop; ($err, $reply) = @_ };
  0            
  0            
284 0           $self->_next($op);
285 0           $self->ioloop->start;
286 0 0         return $err ? croak $err : $reply;
287             }
288              
289             sub _write {
290 0     0     my ($self, $id) = @_;
291              
292             # Make sure connection has not been corrupted while event loop was stopped
293 0           my $c = $self->{connections}{$id};
294 0 0         return $c->{start} if $c->{last};
295 0           my $loop = $self->_loop($c->{nb});
296 0 0         return undef unless my $stream = $loop->stream($id);
297 0 0 0       if (!$loop->is_running && $stream->is_readable) {
298 0           $stream->close;
299 0           return undef;
300             }
301              
302             # Fast operation
303 0 0         delete $c->{start} unless my $last = delete $c->{fast};
304              
305             # Blocking operations have a higher precedence
306             return $c->{start}
307 0 0 0       unless $last || ($c->{nb} xor !($self->{queue}->[-1] || {})->{nb});
      0        
      0        
308 0 0 0       $last ||= $c->{nb} ? shift @{$self->{queue}} : pop @{$self->{queue}};
  0            
  0            
309              
310 0 0         return $c->{start} unless $c->{last} = $last;
311 0           warn "-- Client >>> Server (#$last->{id})\n" if DEBUG;
312 0           $stream->write(delete $last->{msg});
313              
314             # Unsafe operations are done when they are written
315 0 0         return $c->{start} if $last->{safe};
316 0           weaken $self;
317 0     0     $stream->write('', sub { $self->_finish(undef, delete($c->{last})->{cb}) });
  0            
318 0           return $c->{start};
319             }
320              
321             1;
322              
323             =encoding utf8
324              
325             =head1 NAME
326              
327             Mango - Pure-Perl non-blocking I/O MongoDB driver
328              
329             =head1 SYNOPSIS
330              
331             use Mango;
332              
333             # Declare a Mango helper
334             sub mango { state $m = Mango->new('mongodb://localhost:27017') }
335             # or in a Mojolicious::Lite app
336             helper mango => sub { state $m = Mango->new('mongodb://localhost:27017') };
337              
338             # Insert document
339             my $oid = mango->db('test')->collection('foo')->insert({bar => 'baz'});
340              
341             # Find document
342             my $doc = mango->db('test')->collection('foo')->find_one({bar => 'baz'});
343             say $doc->{bar};
344              
345             # Update document
346             mango->db('test')->collection('foo')
347             ->update({bar => 'baz'}, {bar => 'yada'});
348              
349             # Remove document
350             mango->db('test')->collection('foo')->remove({bar => 'yada'});
351              
352             # Insert document with special BSON types
353             use Mango::BSON ':bson';
354             my $oid = mango->db('test')->collection('foo')
355             ->insert({data => bson_bin("\x00\x01"), now => bson_time});
356              
357             # Non-blocking concurrent find
358             my $delay = Mojo::IOLoop->delay(sub {
359             my ($delay, @docs) = @_;
360             ...
361             });
362             for my $name (qw(sri marty)) {
363             my $end = $delay->begin(0);
364             mango->db('test')->collection('users')->find({name => $name})->all(sub {
365             my ($cursor, $err, $docs) = @_;
366             $end->(@$docs);
367             });
368             }
369             $delay->wait;
370              
371             # Event loops such as AnyEvent are supported through EV
372             use EV;
373             use AnyEvent;
374             my $cv = AE::cv;
375             mango->db('test')->command(buildInfo => sub {
376             my ($db, $err, $doc) = @_;
377             $cv->send($doc->{version});
378             });
379             say $cv->recv;
380              
381             =head1 DESCRIPTION
382              
383             L is a pure-Perl non-blocking I/O MongoDB driver, optimized for use
384             with the L real-time web framework, and with multiple event loop
385             support. Since MongoDB is still changing rapidly, only the latest stable
386             version is supported.
387              
388             For MongoDB 2.6 support, use L 1.16.
389              
390             To learn more about MongoDB you should take a look at the
391             L, the documentation included
392             in this distribution is no replacement for it.
393              
394             Look at L for CRUD operations.
395              
396             Many arguments passed to methods as well as values of attributes get
397             serialized to BSON with L, which provides many helper functions
398             you can use to generate data types that are not available natively in Perl.
399             All connections will be reset automatically if a new process has been forked,
400             this allows multiple processes to share the same L object safely.
401              
402             For better scalability (epoll, kqueue) and to provide IPv6, SOCKS5 as well as
403             TLS support, the optional modules L (4.0+), L (0.20+),
404             L (0.64+) and L (1.84+) will be used
405             automatically if they are installed. Individual features can also be disabled
406             with the C, C and C environment
407             variables.
408              
409             =head1 EVENTS
410              
411             L inherits all events from L and can emit the
412             following new ones.
413              
414             =head2 connection
415              
416             $mango->on(connection => sub {
417             my ($mango, $id) = @_;
418             ...
419             });
420              
421             Emitted when a new connection has been established.
422              
423             =head1 ATTRIBUTES
424              
425             L implements the following attributes.
426              
427             =head2 default_db
428              
429             my $name = $mango->default_db;
430             $mango = $mango->default_db('test');
431              
432             Default database, defaults to C.
433              
434             =head2 hosts
435              
436             my $hosts = $mango->hosts;
437             $mango = $mango->hosts([['localhost', 3000], ['localhost', 4000]]);
438              
439             Servers to connect to, defaults to C and port C<27017>.
440              
441             =head2 inactivity_timeout
442              
443             my $timeout = $mango->inactivity_timeout;
444             $mango = $mango->inactivity_timeout(15);
445              
446             Maximum amount of time in seconds a connection can be inactive before getting
447             closed, defaults to C<0>. Setting the value to C<0> will allow connections to
448             be inactive indefinitely.
449              
450             =head2 ioloop
451              
452             my $loop = $mango->ioloop;
453             $mango = $mango->ioloop(Mojo::IOLoop->new);
454              
455             Event loop object to use for blocking I/O operations, defaults to a
456             L object.
457              
458             =head2 j
459              
460             my $j = $mango->j;
461             $mango = $mango->j(1);
462              
463             Wait for all operations to have reached the journal, defaults to C<0>.
464              
465             =head2 max_bson_size
466              
467             my $max = $mango->max_bson_size;
468             $mango = $mango->max_bson_size(16777216);
469              
470             Maximum size for BSON documents in bytes, defaults to C<16777216> (16MB).
471              
472             =head2 max_connections
473              
474             my $max = $mango->max_connections;
475             $mango = $mango->max_connections(5);
476              
477             Maximum number of connections to use for non-blocking operations, defaults to
478             C<5>.
479              
480             =head2 max_write_batch_size
481              
482             my $max = $mango->max_write_batch_size;
483             $mango = $mango->max_write_batch_size(1000);
484              
485             Maximum number of write operations to batch together, defaults to C<1000>.
486              
487             =head2 protocol
488              
489             my $protocol = $mango->protocol;
490             $mango = $mango->protocol(Mango::Protocol->new);
491              
492             Protocol handler, defaults to a L object.
493              
494             =head2 w
495              
496             my $w = $mango->w;
497             $mango = $mango->w(2);
498              
499             Wait for all operations to have reached at least this many servers, C<1>
500             indicates just primary, C<2> indicates primary and at least one secondary,
501             defaults to C<1>.
502              
503             =head2 wtimeout
504              
505             my $timeout = $mango->wtimeout;
506             $mango = $mango->wtimeout(1);
507              
508             Timeout for write propagation in milliseconds, defaults to C<1000>.
509              
510             =head1 METHODS
511              
512             L inherits all methods from L and implements the following
513             new ones.
514              
515             =head2 backlog
516              
517             my $num = $mango->backlog;
518              
519             Number of queued operations that have not yet been assigned to a connection.
520              
521             =head2 db
522              
523             my $db = $mango->db;
524             my $db = $mango->db('test');
525              
526             Build L object for database, uses L if no name
527             is provided. Note that the reference L is weakened,
528             so the L object needs to be referenced elsewhere as well.
529              
530             =head2 from_string
531              
532             $mango
533             = $mango->from_string('mongodb://sri:s3cret@localhost:3000/test?w=2');
534              
535             Parse configuration from connection string.
536              
537             =head2 get_more
538              
539             my $reply = $mango->get_more($namespace, $return, $cursor);
540              
541             Perform low level C operation. You can also append a callback to
542             perform operation non-blocking.
543              
544             $mango->get_more(($namespace, $return, $cursor) => sub {
545             my ($mango, $err, $reply) = @_;
546             ...
547             });
548             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
549              
550             =head2 kill_cursors
551              
552             $mango->kill_cursors(@ids);
553              
554             Perform low level C operation. You can also append a callback to
555             perform operation non-blocking.
556              
557             $mango->kill_cursors(@ids => sub {
558             my ($mango, $err) = @_;
559             ...
560             });
561             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
562              
563             =head2 new
564              
565             my $mango = Mango->new;
566             my $mango = Mango->new('mongodb://sri:s3cret@localhost:3000/test?w=2');
567              
568             Construct a new L object and parse connection string with
569             L if necessary.
570              
571             Not that is is B recommended to build your Mango object inside
572             a helper function like shown in the synopsis. This is because the Mango's
573             object reference inside L objects is weakened to avoid
574             memory leaks. This means your Mango instance is quickly going to get
575             undefined after you use the C method. So, use a helper to prevent that.
576              
577             If a username and password are provided, Mango will try to authenticate using
578             SCRAM-SHA1. B this will require L which is not
579             installed by default.
580              
581             =head2 query
582              
583             my $reply
584             = $mango->query($namespace, $flags, $skip, $return, $query, $fields);
585              
586             Perform low level C operation. You can also append a callback to
587             perform operation non-blocking.
588              
589             $mango->query(($namespace, $flags, $skip, $return, $query, $fields) => sub {
590             my ($mango, $err, $reply) = @_;
591             ...
592             });
593             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
594              
595             =head1 DEBUGGING
596              
597             You can set the C environment variable to get some advanced
598             diagnostics information printed to C.
599              
600             MANGO_DEBUG=1
601              
602             =head1 SPONSORS
603              
604             Some of the work on this distribution has been sponsored by
605             L, thank you!
606              
607             =head1 AUTHOR
608              
609             Sebastian Riedel, C.
610              
611             Current maintainer: Olivier Duclos C.
612              
613             =head1 CREDITS
614              
615             In alphabetical order:
616              
617             =over 2
618              
619             alexbyk
620              
621             Andrey Khozov
622              
623             Colin Cyr
624              
625             =back
626              
627             =head1 COPYRIGHT AND LICENSE
628              
629             Copyright (C) 2013-2014, Sebastian Riedel.
630              
631             This program is free software, you can redistribute it and/or modify it under
632             the terms of the Artistic License version 2.0.
633              
634             =head1 SEE ALSO
635              
636             L, L,
637             L.
638              
639             =cut