File Coverage

blib/lib/MongoDBx/Queue.pm
Criterion Covered Total %
statement 23 33 69.7
branch 0 4 0.0
condition n/a
subroutine 8 10 80.0
pod 0 1 0.0
total 31 48 64.5


line stmt bran cond sub pod time code
1 2     2   3271397 use 5.010;
  2         25  
2 2     2   11 use strict;
  2         4  
  2         45  
3 2     2   12 use warnings;
  2         4  
  2         107  
4              
5             package MongoDBx::Queue;
6              
7             # ABSTRACT: A message queue implemented with MongoDB
8              
9             our $VERSION = '2.002';
10              
11 2     2   1217 use Moose 2;
  2         1441337  
  2         31  
12 2     2   16275 use MooseX::Types::Moose qw/:all/;
  2         114811  
  2         17  
13 2     2   18926 use MooseX::AttributeShortcuts;
  2         899635  
  2         10  
14              
15 2     2   284669 use MongoDB 2 ();
  2         48  
  2         52  
16 2     2   13 use namespace::autoclean;
  2         7  
  2         22  
17              
18             with (
19             'MongoDBx::Queue::Role::_CommonOptions',
20             );
21              
22             #--------------------------------------------------------------------------#
23             # Public attributes
24             #--------------------------------------------------------------------------#
25              
26             #pod =attr database_name
27             #pod
28             #pod A MongoDB database name. Unless a C<db_name> is provided in the
29             #pod C<client_options> attribute, this database will be the default for
30             #pod authentication. Defaults to 'test'
31             #pod
32             #pod =attr client_options
33             #pod
34             #pod A hash reference of L<MongoDB::MongoClient> options that will be passed to its
35             #pod C<connect> method.
36             #pod
37             #pod =attr collection_name
38             #pod
39             #pod A collection name for the queue. Defaults to 'queue'. The collection must
40             #pod only be used by MongoDBx::Queue or unpredictable awful things will happen.
41             #pod
42             #pod =attr version
43             #pod
44             #pod The implementation version to use as a backend. Defaults to '1', which is the
45             #pod legacy implementation for backwards compatibility. Version '2' has better
46             #pod index coverage and will perform better for very large queues.
47             #pod
48             #pod B<WARNING> Versions are not compatible. You MUST NOT have V1 and V2 clients
49             #pod using the same database+collection name. See L</MIGRATION BETWEEN VERSIONS>
50             #pod for more.
51             #pod
52             #pod =cut
53              
54             has version => (
55             is => 'ro',
56             isa => Int,
57             default => 1,
58             );
59              
60             #--------------------------------------------------------------------------#
61             # Private attributes and builders
62             #--------------------------------------------------------------------------#
63              
64             has _implementation => (
65             is => 'lazy',
66             handles => [ qw(
67             add_task
68             reserve_task
69             reschedule_task
70             remove_task
71             apply_timeout
72             search
73             peek
74             size
75             waiting
76             )],
77             );
78              
79             sub _build__implementation {
80 0     0     my ($self) = @_;
81 0           my $options = {
82             client_options => $self->client_options,
83             database_name => $self->database_name,
84             collection_name => $self->collection_name,
85             };
86 0 0         if ($self->version == 1) {
    0          
87 0           require MongoDBx::Queue::_V1;
88 0           return MongoDBx::Queue::_V1->new($options);
89             }
90             elsif ($self->version == 2) {
91 0           require MongoDBx::Queue::_V2;
92 0           return MongoDBx::Queue::_V2->new($options);
93             }
94             else {
95 0           die "Invalid MongoDBx::Queue 'version' (must be 1 or 2)"
96             }
97             }
98              
99             sub BUILD {
100 0     0 0   my ($self) = @_;
101 0           $self->_implementation->create_indexes;
102             }
103              
104             #--------------------------------------------------------------------------#
105             # Public method documentation
106             #--------------------------------------------------------------------------#
107              
108             #pod =method new
109             #pod
110             #pod $queue = MongoDBx::Queue->new(
111             #pod version => 2,
112             #pod database_name => "my_app",
113             #pod client_options => {
114             #pod host => "mongodb://example.net:27017",
115             #pod username => "willywonka",
116             #pod password => "ilovechocolate",
117             #pod },
118             #pod );
119             #pod
120             #pod Creates and returns a new queue object.
121             #pod
122             #pod =method add_task
123             #pod
124             #pod $queue->add_task( \%message, \%options );
125             #pod
126             #pod Adds a task to the queue. The C<\%message> hash reference will be shallow
127             #pod copied into the task and not include objects except as described by
128             #pod L<MongoDB::DataTypes>. Top-level keys must not start with underscores, which are
129             #pod reserved for MongoDBx::Queue.
130             #pod
131             #pod The C<\%options> hash reference is optional and may contain the following key:
132             #pod
133             #pod =for :list
134             #pod * C<priority>: sets the priority for the task. Defaults to C<time()>.
135             #pod
136             #pod Note that setting a "future" priority may cause a task to be invisible
137             #pod to C<reserve_task>. See that method for more details.
138             #pod
139             #pod =method reserve_task
140             #pod
141             #pod $task = $queue->reserve_task;
142             #pod $task = $queue->reserve_task( \%options );
143             #pod
144             #pod Atomically marks and returns a task. The task is marked in the queue as
145             #pod "reserved" (in-progress) so it can not be reserved again unless is is
146             #pod rescheduled or timed-out. The task returned is a hash reference containing the
147             #pod data added in C<add_task>, including private keys for use by MongoDBx::Queue
148             #pod methods.
149             #pod
150             #pod Tasks are returned in priority order from lowest to highest. If multiple tasks
151             #pod have identical, lowest priorities, their ordering is undefined. If no tasks
152             #pod are available or visible, it will return C<undef>.
153             #pod
154             #pod The C<\%options> hash reference is optional and may contain the following key:
155             #pod
156             #pod =for :list
157             #pod * C<max_priority>: sets the maximum priority for the task. Defaults to C<time()>.
158             #pod
159             #pod The C<max_priority> option controls whether "future" tasks are visible. If
160             #pod the lowest task priority is greater than the C<max_priority>, this method
161             #pod returns C<undef>.
162             #pod
163             #pod =method reschedule_task
164             #pod
165             #pod $queue->reschedule_task( $task );
166             #pod $queue->reschedule_task( $task, \%options );
167             #pod
168             #pod Releases the reservation on a task so it can be reserved again.
169             #pod
170             #pod The C<\%options> hash reference is optional and may contain the following key:
171             #pod
172             #pod =for :list
173             #pod * C<priority>: sets the priority for the task. Defaults to the task's original priority.
174             #pod
175             #pod Note that setting a "future" priority may cause a task to be invisible
176             #pod to C<reserve_task>. See that method for more details.
177             #pod
178             #pod =method remove_task
179             #pod
180             #pod $queue->remove_task( $task );
181             #pod
182             #pod Removes a task from the queue (i.e. indicating the task has been processed).
183             #pod
184             #pod =method apply_timeout
185             #pod
186             #pod $queue->apply_timeout( $seconds );
187             #pod
188             #pod Removes reservations that occurred more than C<$seconds> ago. If no
189             #pod argument is given, the timeout defaults to 120 seconds. The timeout
190             #pod should be set longer than the expected task processing time, so that
191             #pod only dead/hung tasks are returned to the active queue.
192             #pod
193             #pod =method search
194             #pod
195             #pod my @results = $queue->search( \%query, \%options );
196             #pod
197             #pod Returns a list of tasks in the queue based on search criteria. The
198             #pod query should be expressed in the usual MongoDB fashion. In addition
199             #pod to MongoDB options (e.g. C<limit>, C<skip> and C<sort>) as described
200             #pod in the MongoDB documentation for L<MongoDB::Collection/find>, this method
201             #pod supports a C<reserved> option. If present, results will be limited to reserved
202             #pod tasks if true or unreserved tasks if false.
203             #pod
204             #pod =method peek
205             #pod
206             #pod $task = $queue->peek( $task );
207             #pod
208             #pod Retrieves a full copy of the task from the queue. This is useful to retrieve all
209             #pod fields from a projected result from C<search>. It is equivalent to:
210             #pod
211             #pod $self->search( { _id => $task->{_id} } );
212             #pod
213             #pod Returns undef if the task is not found.
214             #pod
215             #pod =method size
216             #pod
217             #pod $queue->size;
218             #pod
219             #pod Returns the number of tasks in the queue, including in-progress ones.
220             #pod
221             #pod =method waiting
222             #pod
223             #pod $queue->waiting;
224             #pod
225             #pod Returns the number of tasks in the queue that have not been reserved.
226             #pod
227             #pod =cut
228              
229             __PACKAGE__->meta->make_immutable;
230              
231             1;
232              
233              
234             # vim: ts=4 sts=4 sw=4 et:
235              
236             __END__
237              
238             =pod
239              
240             =encoding UTF-8
241              
242             =head1 NAME
243              
244             MongoDBx::Queue - A message queue implemented with MongoDB
245              
246             =head1 VERSION
247              
248             version 2.002
249              
250             =head1 SYNOPSIS
251              
252             use v5.10;
253             use MongoDBx::Queue;
254              
255             my $queue = MongoDBx::Queue->new(
256             version => 2,
257             database_name => "queue_db",
258             client_options => {
259             host => "mongodb://example.net:27017",
260             username => "willywonka",
261             password => "ilovechocolate",
262             }
263             );
264              
265             $queue->add_task( { msg => "Hello World" } );
266             $queue->add_task( { msg => "Goodbye World" } );
267              
268             while ( my $task = $queue->reserve_task ) {
269             say $task->{msg};
270             $queue->remove_task( $task );
271             }
272              
273             =head1 DESCRIPTION
274              
275             MongoDBx::Queue implements a simple, prioritized message queue using MongoDB as
276             a backend. By default, messages are prioritized by insertion time, creating a
277             FIFO queue.
278              
279             On a single host with MongoDB, it provides a zero-configuration message service
280             across local applications. Alternatively, it can use a MongoDB database
281             cluster that provides replication and fail-over for an even more durable,
282             multi-host message queue.
283              
284             Features:
285              
286             =over 4
287              
288             =item *
289              
290             messages as hash references, not objects
291              
292             =item *
293              
294             arbitrary message fields
295              
296             =item *
297              
298             arbitrary scheduling on insertion
299              
300             =item *
301              
302             atomic message reservation
303              
304             =item *
305              
306             stalled reservations can be timed-out
307              
308             =item *
309              
310             task rescheduling
311              
312             =item *
313              
314             automatically creates correct index
315              
316             =item *
317              
318             fork-safe
319              
320             =back
321              
322             Not yet implemented:
323              
324             =over 4
325              
326             =item *
327              
328             parameter checking
329              
330             =item *
331              
332             error handling
333              
334             =back
335              
336             Warning: do not use with capped collections, as the queued messages will not
337             meet the constraints required by a capped collection.
338              
339             =head1 ATTRIBUTES
340              
341             =head2 database_name
342              
343             A MongoDB database name. Unless a C<db_name> is provided in the
344             C<client_options> attribute, this database will be the default for
345             authentication. Defaults to 'test'
346              
347             =head2 client_options
348              
349             A hash reference of L<MongoDB::MongoClient> options that will be passed to its
350             C<connect> method.
351              
352             =head2 collection_name
353              
354             A collection name for the queue. Defaults to 'queue'. The collection must
355             only be used by MongoDBx::Queue or unpredictable awful things will happen.
356              
357             =head2 version
358              
359             The implementation version to use as a backend. Defaults to '1', which is the
360             legacy implementation for backwards compatibility. Version '2' has better
361             index coverage and will perform better for very large queues.
362              
363             B<WARNING> Versions are not compatible. You MUST NOT have V1 and V2 clients
364             using the same database+collection name. See L</MIGRATION BETWEEN VERSIONS>
365             for more.
366              
367             =head1 METHODS
368              
369             =head2 new
370              
371             $queue = MongoDBx::Queue->new(
372             version => 2,
373             database_name => "my_app",
374             client_options => {
375             host => "mongodb://example.net:27017",
376             username => "willywonka",
377             password => "ilovechocolate",
378             },
379             );
380              
381             Creates and returns a new queue object.
382              
383             =head2 add_task
384              
385             $queue->add_task( \%message, \%options );
386              
387             Adds a task to the queue. The C<\%message> hash reference will be shallow
388             copied into the task and not include objects except as described by
389             L<MongoDB::DataTypes>. Top-level keys must not start with underscores, which are
390             reserved for MongoDBx::Queue.
391              
392             The C<\%options> hash reference is optional and may contain the following key:
393              
394             =over 4
395              
396             =item *
397              
398             C<priority>: sets the priority for the task. Defaults to C<time()>.
399              
400             =back
401              
402             Note that setting a "future" priority may cause a task to be invisible
403             to C<reserve_task>. See that method for more details.
404              
405             =head2 reserve_task
406              
407             $task = $queue->reserve_task;
408             $task = $queue->reserve_task( \%options );
409              
410             Atomically marks and returns a task. The task is marked in the queue as
411             "reserved" (in-progress) so it can not be reserved again unless is is
412             rescheduled or timed-out. The task returned is a hash reference containing the
413             data added in C<add_task>, including private keys for use by MongoDBx::Queue
414             methods.
415              
416             Tasks are returned in priority order from lowest to highest. If multiple tasks
417             have identical, lowest priorities, their ordering is undefined. If no tasks
418             are available or visible, it will return C<undef>.
419              
420             The C<\%options> hash reference is optional and may contain the following key:
421              
422             =over 4
423              
424             =item *
425              
426             C<max_priority>: sets the maximum priority for the task. Defaults to C<time()>.
427              
428             =back
429              
430             The C<max_priority> option controls whether "future" tasks are visible. If
431             the lowest task priority is greater than the C<max_priority>, this method
432             returns C<undef>.
433              
434             =head2 reschedule_task
435              
436             $queue->reschedule_task( $task );
437             $queue->reschedule_task( $task, \%options );
438              
439             Releases the reservation on a task so it can be reserved again.
440              
441             The C<\%options> hash reference is optional and may contain the following key:
442              
443             =over 4
444              
445             =item *
446              
447             C<priority>: sets the priority for the task. Defaults to the task's original priority.
448              
449             =back
450              
451             Note that setting a "future" priority may cause a task to be invisible
452             to C<reserve_task>. See that method for more details.
453              
454             =head2 remove_task
455              
456             $queue->remove_task( $task );
457              
458             Removes a task from the queue (i.e. indicating the task has been processed).
459              
460             =head2 apply_timeout
461              
462             $queue->apply_timeout( $seconds );
463              
464             Removes reservations that occurred more than C<$seconds> ago. If no
465             argument is given, the timeout defaults to 120 seconds. The timeout
466             should be set longer than the expected task processing time, so that
467             only dead/hung tasks are returned to the active queue.
468              
469             =head2 search
470              
471             my @results = $queue->search( \%query, \%options );
472              
473             Returns a list of tasks in the queue based on search criteria. The
474             query should be expressed in the usual MongoDB fashion. In addition
475             to MongoDB options (e.g. C<limit>, C<skip> and C<sort>) as described
476             in the MongoDB documentation for L<MongoDB::Collection/find>, this method
477             supports a C<reserved> option. If present, results will be limited to reserved
478             tasks if true or unreserved tasks if false.
479              
480             =head2 peek
481              
482             $task = $queue->peek( $task );
483              
484             Retrieves a full copy of the task from the queue. This is useful to retrieve all
485             fields from a projected result from C<search>. It is equivalent to:
486              
487             $self->search( { _id => $task->{_id} } );
488              
489             Returns undef if the task is not found.
490              
491             =head2 size
492              
493             $queue->size;
494              
495             Returns the number of tasks in the queue, including in-progress ones.
496              
497             =head2 waiting
498              
499             $queue->waiting;
500              
501             Returns the number of tasks in the queue that have not been reserved.
502              
503             =for Pod::Coverage BUILD
504              
505             =head1 MIGRATION BETWEEN VERSIONS
506              
507             Implementation versions are not compatible. Migration of active tasks from
508             version '1' to version '2' is an exercise left to end users.
509              
510             One approach to migration could be to run a script with two C<MongoDBx::Queue>
511             clients, one using version '1' and one using version '2', using different
512             C<database_name> attributes. Such a script could iteratively reserve a task
513             with the v1 client, add the task via the v2 client, then remove it via the v1
514             client. Workers could be operating on one or both versions of the queue while
515             migration is going on, depending on your needs.
516              
517             =for :stopwords cpan testmatrix url bugtracker rt cpants kwalitee diff irc mailto metadata placeholders metacpan
518              
519             =head1 SUPPORT
520              
521             =head2 Bugs / Feature Requests
522              
523             Please report any bugs or feature requests through the issue tracker
524             at L<https://github.com/dagolden/MongoDBx-Queue/issues>.
525             You will be notified automatically of any progress on your issue.
526              
527             =head2 Source Code
528              
529             This is open source software. The code repository is available for
530             public review and contribution under the terms of the license.
531              
532             L<https://github.com/dagolden/MongoDBx-Queue>
533              
534             git clone https://github.com/dagolden/MongoDBx-Queue.git
535              
536             =head1 AUTHOR
537              
538             David Golden <dagolden@cpan.org>
539              
540             =head1 COPYRIGHT AND LICENSE
541              
542             This software is Copyright (c) 2012 by David Golden.
543              
544             This is free software, licensed under:
545              
546             The Apache License, Version 2.0, January 2004
547              
548             =cut