File Coverage

blib/lib/MangoX/Queue.pm
Criterion Covered Total %
statement 21 256 8.2
branch 0 160 0.0
condition 0 49 0.0
subroutine 7 39 17.9
pod 14 17 82.3
total 42 521 8.0


line stmt bran cond sub pod time code
1             package MangoX::Queue;
2              
3 12     12   3676178 use Mojo::Base 'Mojo::EventEmitter';
  12         28  
  12         76  
4              
5 12     12   2377 use Carp 'croak';
  12         25  
  12         651  
6 12     12   10267 use Mojo::Log;
  12         29225  
  12         111  
7 12     12   345 use Mango::BSON ':bson';
  12         26  
  12         2569  
8 12     12   7965 use MangoX::Queue::Delay;
  12         33  
  12         113  
9 12     12   6967 use MangoX::Queue::Job;
  12         35  
  12         125  
10              
11             our $VERSION = '0.16';
12              
13             # A logger
14             has 'log' => sub { Mojo::Log->new->level('error') };
15              
16             # The Mango::Collection representing the queue
17             has 'collection';
18             has 'capped' => sub { $_[0]->stats->{capped} };
19             has 'stats' => sub { $_[0]->collection->stats };
20              
21             # A MangoX::Queue::Delay
22             has 'delay' => sub { MangoX::Queue::Delay->new };
23              
24             # How long to wait before assuming a job has failed
25             has 'timeout' => sub { $ENV{MANGOX_QUEUE_JOB_TIMEOUT} // 60 };
26              
27             # How many times to retry a job before giving up
28             has 'retries' => sub { $ENV{MANGOX_QUEUE_JOB_RETRIES} // 5 };
29              
30             # Prevent binary object IDs escaping MangoX::Queue
31             has 'no_binary_oid' => sub { $ENV{MANGOX_QUEUE_NO_BINARY_OID} // 0 };
32              
33             # Current number of jobs that have been consumed but not yet completed
34             has 'job_count' => 0;
35              
36             # Maximum number of jobs allowed to be in a consumed state at any one time
37             has 'concurrent_job_limit' => 10;
38              
39             # Store Mojo::IOLoop->timer IDs
40             has 'consumers' => sub { {} };
41              
42             # Store plugins
43             has 'plugins' => sub { {} };
44              
45             # Compatibility with Mojo::IOLoop->delay
46             has 'delay_compat' => sub { warn "delay_compat will be enabled by default in a future release - please migrate your code"; 0 };
47              
48             sub new {
49 0     0 1   my $self = shift->SUPER::new(@_);
50              
51 0 0         croak qq{No Mango::Collection provided to constructor} unless ref($self->collection) eq 'Mango::Collection';
52              
53 0           return $self;
54             }
55              
56             sub plugin {
57 0     0 0   my ($self, $name, $options) = @_;
58              
59 0 0         croak qq{Plugin $name already loaded} if exists $self->plugins->{$name};
60              
61             {
62 12     12   6764 no strict 'refs';
  12         20  
  12         79567  
  0            
63 0 0         unless($name->can('new')) {
64 0 0         eval "require $name" or croak qq{Failed to load plugin $name: $@};
65             }
66             }
67              
68 0 0         eval {
69 0           $self->plugins->{$name} = $name->new(%$options);
70 0           return 1;
71             } or croak qq{Error calling constructor for plugin $name: $@};
72              
73 0 0         eval {
74 0           $self->plugins->{$name}->register($self);
75 0           return 1;
76             } or croak qq{Error calling register for plugin $name: $@};
77              
78 0           return $self->plugins->{$name};
79             }
80              
81             sub init_status {
82 0     0 0   my ($self) = @_;
83             # Done as late as possible - $self->capped opens a DB connection
84 0 0         $self->pending_status($self->capped ? 1 : 'Pending');
85 0 0         $self->processing_status($self->capped ? 2 : 'Processing');
86 0 0         $self->failed_status($self->capped ? 3 : 'Failed');
87             return {
88 0           pending_and_processing_status => $self->{pending_and_processing_status},
89             _pending_status => $self->{_pending_status},
90             _processing_status => $self->{_processing_status},
91             _failed_status => $self->{_failed_status},
92             };
93             }
94              
95             sub pending_status {
96 0     0 1   my ($self, $new_status) = @_;
97              
98 0 0         return $self->{_pending_status} unless defined $new_status;
99              
100 0           $self->{_pending_status} = $new_status;
101 0           $self->_combine_pending_and_processing_status();
102             }
103              
104             sub processing_status {
105 0     0 1   my ($self, $new_status) = @_;
106              
107 0 0         return $self->{_processing_status} unless defined $new_status;
108              
109 0           $self->{_processing_status} = $new_status;
110 0           $self->_combine_pending_and_processing_status();
111             }
112              
113             sub failed_status {
114 0     0 1   my ($self, $new_status) = @_;
115              
116 0 0         return $self->{_failed_status} unless defined $new_status;
117 0           $self->{_failed_status} = $new_status;
118             }
119              
120             sub _combine_pending_and_processing_status {
121 0     0     my ($self) = @_;
122              
123 0           $self->{pending_and_processing_status} = [
124 0 0         ref($self->{_pending_status}) eq 'ARRAY' ? @{$self->{_pending_status}} : $self->{_pending_status},
125             $self->{_processing_status},
126             ];
127             }
128              
129             sub get_options {
130 0     0 1   my ($self) = @_;
131              
132             return {
133 0   0       query => {
      0        
134             status => {
135             '$in' => $self->{pending_and_processing_status} // $self->init_status->{pending_and_processing_status},
136             },
137             processing => {
138             '$lt' => time - $self->timeout,
139             },
140             attempt => {
141             '$lte' => $self->retries + 1,
142             },
143             delay_until => {
144             '$lt' => time,
145             }
146             },
147             sort => bson_doc( # Sort by priority, then in order of creation
148             'priority' => 1,
149             'created' => -1,
150             ),
151             update => {
152             '$set' => {
153             processing => time,
154             status => $self->{_processing_status} // $self->init_status->{_processing_status},
155             },
156             '$inc' => {
157             attempt => 1,
158             }
159             }
160             };
161             }
162              
163             sub run_callback {
164 0     0 0   my ($self, $callback) = (shift, shift);
165              
166 0 0         if ($self->delay_compat) {
167 0           $callback->($self, @_);
168             } else {
169 0           $callback->(@_);
170             }
171             }
172              
173             sub enqueue {
174 0     0 1   my ($self, @args) = @_;
175              
176             # args maybe
177             # - 'job_name'
178             # - foo => bar, 'job_name'
179             # - 'job_name', $callback
180             # - foo => bar, 'job_name', $callback
181              
182 0 0         my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
183 0           my $job = pop @args;
184 0           my %args;
185 0 0         %args = (@args) if scalar @args;
186              
187 0   0       my $db_job = {
      0        
      0        
      0        
188             priority => $args{priority} // 1,
189             created => $args{created} // bson_time,
190             data => $job,
191             status => $args{status} // $self->{_pending_status} // $self->init_status->{_pending_status},
192             attempt => 1,
193             processing => 0,
194             delay_until => 0,
195             };
196              
197 0 0         $db_job->{delay_until} = $args{delay_until} if $args{delay_until};
198              
199 0 0         if($callback) {
200             return $self->collection->insert($db_job => sub {
201 0     0     my ($collection, $error, $oid) = @_;
202 0 0         if($error) {
203 0           $self->emit_safe(error => qq{Error inserting job into collection: $error}, $db_job, $error);
204 0           $self->run_callback($callback, $db_job, $error);
205 0           return;
206             }
207 0           $db_job->{_id} = $oid;
208 0 0         $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
209 0 0         eval {
210 0           $self->run_callback($callback, $db_job, undef);
211 0           return 1;
212             } or $self->emit_safe(error => qq{Error in callback: $@}, $db_job, $@);
213 0           });
214             } else {
215 0 0         eval {
216 0           $db_job->{_id} = $self->collection->insert($db_job);
217 0           return 1;
218             } or croak qq{Error inserting job into collection: $@};
219 0 0         $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
220 0           return $db_job;
221             }
222             }
223              
224             sub watch {
225 0     0 1   my ($self, $id_or_job, $status, $callback) = @_;
226              
227 0 0         my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;
228              
229 0   0       $status //= 'Complete';
230              
231             # args
232             # - watch $queue $id, 'Status' => $callback
233              
234 0 0         if($callback) {
235             # Non-blocking
236 0           $self->log->debug("Waiting for $id on status $status in non-blocking mode");
237 0     0     return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
  0            
238             } else {
239             # Blocking
240 0           $self->log->debug("Waiting for $id on status $status in blocking mode");
241 0           return $self->_watch_blocking($id, $status);
242             }
243             }
244              
245             sub _watch_blocking {
246 0     0     my ($self, $id, $status) = @_;
247              
248 0           while(1) {
249 0           my $doc = $self->collection->find_one({'_id' => $id});
250 0 0         $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
251              
252 0 0 0       if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && grep { $_ =~ $doc->{status} } @$status))) {
      0        
253 0           return 1;
254             } else {
255 0           $self->delay->wait;
256             }
257             }
258             }
259              
260             sub _watch_nonblocking {
261 0     0     my ($self, $id, $status, $callback) = @_;
262              
263             $self->collection->find_one({'_id' => $id} => sub {
264 0     0     my ($cursor, $err, $doc) = @_;
265 0 0         $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
266              
267 0 0 0       if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && grep { $_ =~ $doc->{status} } @$status))) {
      0        
268 0           $self->log->debug("Status is $status");
269 0           $self->delay->reset;
270 0           $self->run_callback($callback, $doc, undef);
271             } else {
272 0           $self->log->debug("Job not found or status doesn't match");
273             $self->delay->wait(sub {
274 0 0         return unless Mojo::IOLoop->is_running;
275 0           Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
  0            
276 0           });
277 0           return undef;
278             }
279 0           });
280             }
281              
282             sub requeue {
283 0     0 1   my ($self, $job, $callback) = @_;
284              
285 0   0       my $pending = $self->{_pending_status} // $self->init_status->{_pending_status};
286 0 0         $job->{status} = ref($pending) eq 'ARRAY' ? $pending->[0] : $pending;
287 0           return $self->update($job, $callback);
288             }
289              
290             sub dequeue {
291 0     0 1   my ($self, $id_or_job, $callback) = @_;
292              
293             # TODO option to not remove on dequeue?
294              
295 0 0         my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;
296              
297 0 0         if($callback) {
298             $self->collection->remove({'_id' => $id} => sub {
299 0     0     my ($collection, $error, $doc) = @_;
300              
301 0 0         if($error) {
302 0 0         $self->emit_safe(error => qq(Error removing job from collection: $error), $id_or_job, $error) if $self->has_subscribers('error');
303 0           $self->run_callback($callback, $id_or_job, $error);
304 0           return;
305             }
306              
307 0           $self->run_callback($callback, $id_or_job, undef);
308 0 0         $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
309 0           });
310             } else {
311 0           $self->collection->remove({'_id' => $id});
312 0 0         $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
313             }
314             }
315              
316             sub get {
317 0     0 1   my ($self, $id_or_job, $callback) = @_;
318              
319 0 0         my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;
320              
321 0 0         if($callback) {
322             return $self->collection->find_one({'_id' => $id} => sub {
323 0     0     my ($collection, $error, $doc) = @_;
324              
325 0 0         if($error) {
326 0 0         $self->emit_safe(error => qq(Error retrieving job: $error), $id_or_job, $error) if $self->has_subscribers('error');
327             }
328              
329 0           $self->run_callback($callback, $doc, $error);
330 0           });
331             } else {
332 0           return $self->collection->find_one({'_id' => $id});
333             }
334             }
335              
336             sub update {
337 0     0 1   my ($self, $job, $callback) = @_;
338              
339             # FIXME Temporary fix to remove has_finished indicator from MangoX::Queue::Job
340 0           $job = { map { $_ => $job->{$_} } grep { $_ !~ /^(?:has_finished|events)$/ } keys %$job };
  0            
  0            
341 0 0         $job->{_id} = Mango::BSON::ObjectID->new($job->{_id}) if $self->no_binary_oid;
342              
343 0 0         if($callback) {
344             return $self->collection->update({'_id' => $job->{_id}}, $job => sub {
345 0     0     my ($collection, $error, $doc) = @_;
346 0 0         if($error) {
347 0 0         $self->emit_safe(error => qq(Error updating job: $error), $job, $error) if $self->has_subscribers('error');
348             }
349 0           $self->run_callback($callback, $doc, $error);
350 0           });
351             } else {
352 0 0         return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1}) or croak qq{Error updating collection: $@};
353             }
354             }
355              
356             sub fetch {
357 0     0 1   my ($self, @args) = @_;
358              
359             # fetch $queue status => 'Complete', sub { my $job = shift; }
360              
361 0 0         my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
362 0           my %args;
363 0 0         %args = (@args) if scalar @args;
364              
365 0           $self->log->debug("In fetch");
366              
367 0 0         if($callback) {
368 0           $self->log->debug("Fetching in non-blocking mode");
369 0           my $consumer_id = (scalar keys %{$self->consumers}) + 1;
  0            
370 0     0     $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) });
  0            
371 0           return $consumer_id;
372             } else {
373 0           $self->log->debug("Fetching in blocking mode");
374 0           return $self->_consume_blocking(\%args, 1);
375             }
376             }
377              
378             sub consume {
379 0     0 1   my ($self, @args) = @_;
380              
381             # consume $queue status => 'Failed', sub { my $job = shift; }
382              
383 0 0         my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
384 0           my %args;
385 0 0         %args = (@args) if scalar @args;
386              
387 0           $self->log->debug("In consume");
388              
389 0 0         if($callback) {
390 0           $self->log->debug("consuming in non-blocking mode");
391 0           my $consumer_id = (scalar keys %{$self->consumers}) + 1;
  0            
392 0     0     $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) });
  0            
393 0           $self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
394 0           return $consumer_id;
395             } else {
396 0           $self->log->debug("consuming in blocking mode");
397 0           return $self->_consume_blocking(\%args, 0);
398             }
399             }
400              
401             sub release {
402 0     0 1   my ($self, $consumer_id) = @_;
403              
404 0           $self->log->debug("Releasing consumer $consumer_id with timer ID: " . $self->consumers->{$consumer_id});
405              
406 0           Mojo::IOLoop->remove($self->consumers->{$consumer_id});
407 0           delete $self->consumers->{$consumer_id};
408              
409 0           return 1;
410             }
411              
412             sub _consume_blocking {
413 0     0     my ($self, $args, $fetch) = @_;
414              
415 0           while(1) {
416 0           my $opts = $self->get_options;
417 0 0         $opts->{query} = $args if scalar keys %$args;
418              
419 0           my $doc = $self->collection->find_and_modify($opts);
420 0 0         $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
421              
422 0 0 0       if($doc && $doc->{attempt} > $self->retries) {
423 0   0       $doc->{status} = $self->{_failed_status} // $self->init_status->{_failed_status};
424 0           $self->update($doc);
425 0           $doc = undef;
426 0           $self->log->debug("Job exceeded retries, status set to failed and job abandoned");
427             }
428              
429 0 0         if($doc) {
430 0 0         $doc->{_id} = $doc->{_id}->to_string if $self->no_binary_oid;
431 0 0         $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
432 0           return $doc;
433             } else {
434 0 0         last if $fetch;
435 0           $self->delay->wait;
436             }
437             }
438             }
439              
440             sub _consume_nonblocking {
441 0     0     my ($self, $args, $consumer_id, $callback, $fetch) = @_;
442              
443 0 0         $self->log->debug("Active jobs: " . $self->job_count . '/' . ($self->concurrent_job_limit < 0 ? '*' : $self->concurrent_job_limit));
444              
445             # Don't allow consumption if job_count has been reached
446 0 0 0       if ($self->concurrent_job_limit > -1 && $self->job_count >= $self->concurrent_job_limit) {
447 0 0         return unless Mojo::IOLoop->is_running;
448 0 0         return if $fetch;
449 0 0         $self->emit_safe(concurrent_job_limit_reached => $self->concurrent_job_limit) if $self->has_subscribers('concurrent_job_limit_reached');
450 0           $self->log->debug("concurrent_job_limit_reached = " . $self->concurrent_job_limit . ", job_count = " . $self->job_count);
451 0 0         return unless exists $self->consumers->{$consumer_id};
452              
453             $self->delay->wait(sub {
454 0 0   0     return unless exists $self->consumers->{$consumer_id};
455 0           $self->_consume_nonblocking($args, $consumer_id, $callback, 0);
456 0           });
457              
458 0           $self->log->debug("Timer rescheduled (job_count limit reached), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
459              
460 0           return;
461             }
462              
463 0           my $opts = $self->get_options;
464 0 0         $opts->{query} = $args if scalar keys %$args;
465              
466             $self->collection->find_and_modify($opts => sub {
467 0     0     my ($cursor, $err, $doc) = @_;
468 0 0         $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
469              
470 0 0         if($err) {
471 0           $self->log->error($err);
472 0           $self->emit_safe(error => $err);
473             }
474              
475 0 0 0       if($doc && $doc->{attempt} > $self->retries) {
476 0   0       $doc->{status} = $self->{_failed_status} // $self->init_status->{_failed_status};
477 0           $self->update($doc);
478 0           $doc = undef;
479 0           $self->log->debug("Job exceeded retries, status set to failed and job abandoned");
480             }
481              
482 0 0         if($doc) {
483 0 0         $doc->{_id} = $doc->{_id}->to_string if $self->no_binary_oid;
484              
485 0           $self->job_count($self->job_count + 1);
486 0           $self->log->debug("job_count incremented to " . $self->job_count);
487              
488 0           my $job = MangoX::Queue::Job->new($doc);
489             $job->once(finished => sub {
490 0           $self->job_count($self->job_count - 1);
491 0           $self->log->debug('job_count decremented to ' . $self->job_count);
492 0           });
493              
494 0           $self->delay->reset;
495 0 0         $self->emit_safe(consumed => $job) if $self->has_subscribers('consumed');
496              
497 0 0         eval {
498 0           $self->run_callback($callback, $job);
499 0           return 1;
500             } or $self->emit_safe(error => "Error in callback: $@");
501 0 0         return unless Mojo::IOLoop->is_running;
502 0 0         return if $fetch;
503 0 0         return unless exists $self->consumers->{$consumer_id};
504             Mojo::IOLoop->timer(0, sub {
505 0           $self->_consume_nonblocking($args, $consumer_id, $callback, 0) }
506 0           );
507 0           $self->log->debug("Timer rescheduled (recursive immediate), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
508             } else {
509 0 0         return unless Mojo::IOLoop->is_running;
510 0 0         return if $fetch;
511             $self->delay->wait(sub {
512 0 0         return unless exists $self->consumers->{$consumer_id};
513 0           $self->_consume_nonblocking($args, $consumer_id, $callback, 0);
514 0           });
515 0           $self->log->debug("Timer rescheduled (recursive delayed), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
516 0           return undef;
517             }
518 0           });
519             }
520              
521             1;
522              
523             =encoding utf8
524              
525             =head1 NAME
526              
527             MangoX::Queue - A MongoDB queue implementation using Mango
528              
529             =head1 DESCRIPTION
530              
531             L is a MongoDB backed queue implementation using L to support
532             blocking and non-blocking queues.
533              
534             L makes no attempt to handle the L connection, database or
535             collection - pass in a collection to the constructor and L will
536             use it. The collection can be plain, capped or sharded.
537              
538             For an introduction to L, see L.
539              
540             B - the current API is inconsistent with L and other L
541             modules. A C option has been added, which is currently disabled by default.
542             This will be enabled by default in a future release, and eventually deprecated.
543              
544             =head1 SYNOPSIS
545              
546             =head2 Non-blocking
547              
548             Non-blocking mode requires a running L.
549              
550             my $queue = MangoX::Queue->new(collection => $mango_collection);
551              
552             # To add a job
553             enqueue $queue 'test' => sub { my $id = shift; };
554              
555             # To set options
556             enqueue $queue priority => 1, created => bson_time, 'test' => sub { my $id = shift; };
557              
558             # To watch for a specific job status
559             watch $queue $id, 'Complete' => sub {
560             # Job status is 'Complete'
561             };
562              
563             # To fetch a job
564             fetch $queue sub {
565             my ($job) = @_;
566             # ...
567             };
568              
569             # To get a job by id
570             get $queue $id => sub { my $job = shift; };
571              
572             # To requeue a job
573             requeue $queue $job => sub { my $id = shift; };
574              
575             # To dequeue a job
576             dequeue $queue $id => sub { };
577              
578             # To consume a queue
579             my $consumer = consume $queue sub {
580             my ($job) = @_;
581             # ...
582             };
583              
584             # To stop consuming a queue
585             release $queue $consumer;
586              
587             # To listen for errors
588             on $queue error => sub { my ($queue, $error) = @_; };
589              
590             =head2 Blocking
591              
592             my $queue = MangoX::Queue->new(collection => $mango_collection);
593              
594             # To add a job
595             my $id = enqueue $queue 'test';
596              
597             # To set options
598             my $id = enqueue $queue priority => 1, created => bson_time, 'test';
599              
600             # To watch for a specific job status
601             watch $queue $id;
602              
603             # To fetch a job
604             my $job = fetch $queue;
605              
606             # To get a job by id
607             my $job = get $queue $id;
608              
609             # To requeue a job
610             my $id = requeue $queue $job;
611              
612             # To dequeue a job
613             dequeue $queue $id;
614              
615             # To consume a queue
616             while(my $job = consume $queue) {
617             # ...
618             }
619              
620             =head2 Other
621              
622             my $queue = MangoX::Queue->new(collection => $mango_collection);
623              
624             # To listen for events
625             on $queue enqueued => sub ( my ($queue, $job) = @_; };
626             on $queue dequeued => sub ( my ($queue, $job) = @_; };
627             on $queue consumed => sub { my ($queue, $job) = @_; };
628              
629             # To register a plugin
630             plugin $queue 'MangoX::Queue::Plugin::Statsd';
631              
632             =head1 ATTRIBUTES
633              
634             L implements the following attributes.
635              
636             =head2 collection
637              
638             my $collection = $queue->collection;
639             $queue->collection($mango->db('foo')->collection('bar'));
640              
641             my $queue = MangoX::Queue->new(collection => $collection);
642              
643             The L representing the MongoDB queue collection.
644              
645             =head2 delay
646              
647             my $delay = $queue->delay;
648             $queue->delay(MangoX::Queue::Delay->new);
649              
650             The L responsible for dynamically controlling the
651             delay between queue queries.
652              
653             =head2 delay_compat
654              
655             my $compat = $queue->delay_compat;
656             $queue->delay_compat(1);
657              
658             Enabling C passes C<$self> as the first argument to queue
659             callbacks, to fix a compatibility bug with L.
660              
661             This will be enabled by default in a future release. Please migrate your
662             code to work with the new API, and enable C on construction:
663              
664             my $queue = MangoX::Queue->new(delay_compat => 1);
665              
666             =head2 concurrent_job_limit
667              
668             my $concurrent_job_limit = $queue->concurrent_job_limit;
669             $queue->concurrent_job_limit(20);
670              
671             The maximum number of concurrent jobs (jobs consumed from the queue and unfinished). Defaults to 10.
672              
673             This only applies to jobs on the queue in non-blocking mode. L has an internal counter
674             that is incremented when a job has been consumed from the queue (in non-blocking mode). The job
675             returned is a L instance and has a descructor method that is called to decrement
676             the internal counter. See L for more details.
677              
678             Set to -1 to disable queue concurrency limits. B, this could result in
679             out of memory errors or an extremely slow event loop.
680              
681             If you need to decrement the job counter early (e.g. to hold on to a reference to the job after you've
682             finished processing it), you can call the C method on the L object.
683              
684             $job->finished;
685              
686             =head2 failed_status
687              
688             $self->failed_status('Failed');
689              
690             Set a custom failed status.
691              
692             =head2 no_binary_oid
693              
694             $no_bin = $self->no_binary_oid;
695             $self->no_binary_oid(1);
696              
697             Set to 1 to disable binary ObjectIDs being returned by MangoX::Queue in the Job object.
698              
699             =head2 pending_status
700              
701             $self->pending_status('Pending');
702             $self->pending_status(['Pending', 'pending']);
703              
704             Set a custom pending status, can be an array ref.
705              
706             =head2 plugins
707              
708             my $plugins = $queue->plugins;
709              
710             Returns a hash containing the plugins registered with this queue.
711              
712             =head2 processing_status
713              
714             $self->processing_status('Processing');
715              
716             Set a custom processing status.
717              
718             =head2 retries
719              
720             my $retries = $queue->retries;
721             $queue->retries(5);
722              
723             The number of times a job will be picked up from the queue before it is
724             marked as failed.
725              
726             =head2 timeout
727              
728             my $timeout = $queue->timeout;
729             $queue->timeout(10);
730              
731             The time (in seconds) a job is allowed to stay in Retrieved state before
732             it is released back into Pending state. Defaults to 60 seconds.
733              
734             =head1 EVENTS
735              
736             L inherits from L and emits the following events.
737              
738             Events are emitted only for actions on the current queue object, not the entire queue.
739              
740             =head2 consumed
741              
742             on $queue consumed => sub {
743             my ($queue, $job) = @_;
744             # ...
745             };
746              
747             Emitted when an item is consumed (either via consume or fetch)
748              
749             =head2 dequeued
750              
751             on $queue dequeued => sub {
752             my ($queue, $job) = @_;
753             # ...
754             };
755              
756             Emitted when an item is dequeued
757              
758             =head2 enqueued
759              
760             on $queue enqueued => sub {
761             my ($queue, $job) = @_;
762             # ...
763             };
764              
765             Emitted when an item is enqueued
766              
767             =head2 concurrent_job_limit_reached
768              
769             on $queue enqueued => sub {
770             my ($queue, $concurrent_job_limit) = @_;
771             # ...
772             };
773              
774             Emitted when a job is found but the limit has been reached.
775              
776             =head1 METHODS
777              
778             L implements the following methods.
779              
780             =head2 consume
781              
782             # In blocking mode
783             while(my $job = consume $queue) {
784             # ...
785             }
786              
787             # In non-blocking mode
788             consume $queue sub {
789             my ($job) = @_;
790             # ...
791             };
792              
793             Waits for jobs to arrive on the queue, sleeping between queue checks using
794             L or L.
795              
796             Currently sets the status to 'Retrieved' before returning the job.
797              
798             =head2 dequeue
799              
800             my $job = fetch $queue;
801             dequeue $queue $job;
802              
803             Dequeues a job. Currently removes it from the collection.
804              
805             =head2 enqueue
806              
807             my $id = enqueue $queue 'job name';
808             my $id = enqueue $queue [ 'some', 'data' ];
809             my $id = enqueue $queue +{ foo => 'bar' };
810              
811             Add an item to the queue in blocking mode. The default priority is 1 and status is 'Pending'.
812              
813             You can set queue options including priority, created and status.
814              
815             my $id = enqueue $queue,
816             priority => 1,
817             created => bson_time,
818             status => 'Pending',
819             +{
820             foo => 'bar'
821             };
822              
823             For non-blocking mode, pass in a coderef as the final argument.
824              
825             my $id = enqueue $queue 'job_name' => sub {
826             # ...
827             };
828              
829             my $id = enqueue $queue priority => 1, +{
830             foo => 'bar',
831             } => sub {
832             # ...
833             };
834              
835             Sets the status to 'Pending' by default.
836              
837             =head2 fetch
838              
839             # In blocking mode
840             my $job = fetch $queue;
841              
842             # In non-blocking mode
843             fetch $queue sub {
844             my ($job) = @_;
845             # ...
846             };
847              
848             Fetch a single job from the queue, returning undef if no jobs are available.
849              
850             Currently sets job status to 'Retrieved'.
851              
852             =head2 get
853              
854             # In non-blocking mode
855             get $queue $id => sub {
856             my ($job) = @_;
857             # ...
858             };
859              
860             # In blocking mode
861             my $job = get $queue $id;
862              
863             Gets a job from the queue by ID. Doesn't change the job status.
864              
865             You can also pass in a job instead of an ID.
866              
867             $job = get $queue $job;
868              
869             =head2 get_options
870              
871             my $options = $queue->get_options;
872              
873             Returns the L options hash used by find_and_modify to
874             identify and update available queue items.
875              
876             =head2 release
877              
878             my $consumer = consume $queue sub {
879             # ...
880             };
881             release $queue $consumer;
882              
883             Releases a non-blocking consumer from watching a queue.
884              
885             =head2 requeue
886              
887             my $job = fetch $queue;
888             requeue $queue $job;
889              
890             Requeues a job. Sets the job status to 'Pending'.
891              
892             =head2 update
893              
894             my $job = fetch $queue;
895             $job->{status} = 'Failed';
896             update $queue $job;
897              
898             Updates a job in the queue.
899              
900             =head2 watch
901              
902             Wait for a job to enter a certain status.
903              
904             # In blocking mode
905             my $id = enqueue $queue 'test';
906             watch $queue $id, 'Complete'; # blocks until job is complete
907              
908             # In non-blocking mode
909             my $id = enqueue $queue 'test';
910             watch $queue $id, 'Complete' => sub {
911             # ...
912             };
913              
914             =head1 FUTURE JOBS
915              
916             Jobs can be queued in advance by setting a delay_until attribute:
917              
918             enqueue $queue delay_until => (time + 20), "job name";
919              
920             =head1 ERRORS
921              
922             Errors are reported by MangoX::Queue using callbacks and L
923              
924             To listen for all errors on a queue, subscribe to the 'error' event:
925              
926             $queue->on(error => sub {
927             my ($queue, $job, $error) = @_;
928             # ...
929             });
930              
931             To check for errors against an individual update, enqueue or dequeue call,
932             you can check for an error argument to the callback sub:
933              
934             enqueue $queue +$job => sub {
935             my ($job, $error) = @_;
936              
937             if($error) {
938             # ...
939             }
940             }
941              
942             =head1 CONTRIBUTORS
943              
944             =over
945              
946             =item Ben Vinnerd, ben@vinnerd.com
947             =item Olivier Duclos, github.com/oliwer
948              
949             =back
950              
951             =head1 SEE ALSO
952              
953             L, L, L
954              
955             =cut