File Coverage

blib/lib/Resque.pm
Criterion Covered Total %
statement 26 86 30.2
branch 0 30 0.0
condition 0 21 0.0
subroutine 9 25 36.0
pod 15 15 100.0
total 50 177 28.2


line stmt bran cond sub pod time code
1             package Resque;
2             # ABSTRACT: Redis-backed library for creating background jobs, placing them on multiple queues, and processing them later.
3             $Resque::VERSION = '0.40';
4 9     9   942435 use Moose;
  9         4301598  
  9         68  
5 9     9   65453 use Scalar::Util 'blessed';
  9         19  
  9         614  
6 9     9   58 use Moose::Util::TypeConstraints;
  9         19  
  9         101  
7 9     9   19370 use Class::Load;
  9         21  
  9         377  
8 9     9   6486 use Data::Compare;
  9         131804  
  9         70  
9             with 'Resque::Pluggable';
10              
11 9     9   46254 use Resque::Job;
  9         150990  
  9         559  
12 9     9   6211 use Resque::Worker;
  9         3935  
  9         440  
13 9     9   5704 use Resque::Failures;
  9         3523  
  9         12576  
14              
15 0     0   0 sub _redis_class{ Class::Load::load_first_existing_class( 'Redis::Fast', 'Redis' ) }
16              
17              
18              
19             subtype 'Sugar::Redis' => as 'Object';
20             coerce 'Sugar::Redis' => from 'Str' => via {
21             _redis_class->new(
22             server => $_,
23             reconnect => 60,
24             every => 250,
25             encoding => undef
26             )
27             };
28              
29             coerce 'Sugar::Redis' => from 'HashRef' => via {
30             _redis_class->new((
31             reconnect => 60,
32             every => 250,
33             encoding => undef,
34             %$_,
35             ));
36             };
37              
38             has redis => (
39             is => 'ro',
40             lazy => 1,
41             coerce => 1,
42             isa => 'Sugar::Redis',
43             default => sub { _redis_class->new }
44             );
45              
46             has namespace => ( is => 'rw', default => sub { 'resque' } );
47              
48             has failures => (
49             is => 'rw',
50             lazy => 1,
51             default => sub { Resque::Failures->new( resque => $_[0] ) },
52             handles => [qw/ throw /]
53             );
54              
55             sub worker {
56 0     0 1 0 my $self = shift;
57 0         0 $self->worker_class->new( resque => $self );
58             }
59              
60             sub push {
61 0     0 1 0 my ( $self, $queue, $job ) = @_;
62 0 0       0 confess "Can't push an empty job." unless $job;
63 0         0 $self->_watch_queue($queue);
64 0 0 0     0 $job = $self->new_job($job) unless blessed $job && $job->isa('Resque::Job');
65 0         0 $self->redis->rpush( $self->key( queue => $queue ), $job->encode );
66             }
67              
68             sub pop {
69 0     0 1 0 my ( $self, $queue ) = @_;
70 0         0 my $payload = $self->redis->lpop($self->key( queue => $queue ));
71 0 0       0 return unless $payload;
72              
73 0         0 $self->new_job({
74             payload => $payload,
75             queue => $queue
76             });
77             }
78              
79             sub blpop {
80 0     0 1 0 my ( $self, $queues, $timeout ) = @_;
81 0   0     0 my ( $key, $payload ) = $self->redis->blpop(( map { $self->key( queue => $_ ) } @$queues ), $timeout || 0 );
  0         0  
82 0 0       0 return unless $payload;
83              
84 0         0 $self->new_job({
85             payload => $payload,
86             queue => (split(':', $key))[-1]
87             });
88             }
89              
90             sub size {
91 0     0 1 0 my ( $self, $queue ) = @_;
92 0         0 $self->redis->llen( $self->key( queue => $queue ) );
93             }
94              
95             sub peek {
96 0     0 1 0 my ( $self, $queue, $start, $count ) = @_;
97 0   0     0 my $jobs = $self->list_range(
      0        
98             $self->key( queue => $queue ),
99             $start || 0, $count || 1
100             );
101 0         0 $_ = $self->new_job({ queue => $queue, payload => $_ }) for @$jobs;
102 0 0       0 return wantarray ? @$jobs : $jobs;
103             }
104              
105             sub queues {
106 0     0 1 0 my $self = shift;
107 0         0 my @queues = $self->redis->smembers( $self->key('queues') );
108 0 0       0 return wantarray ? @queues : \@queues;
109             }
110              
111             sub remove_queue {
112 0     0 1 0 my ( $self, $queue ) = @_;
113 0         0 $self->redis->srem( $self->key('queues'), $queue );
114 0         0 $self->redis->del( $self->key( queue => $queue ) );
115             }
116              
117             sub create_queue {
118 0     0 1 0 my ( $self, $queue ) = @_;
119 0         0 $self->_watch_queue( $queue );
120             }
121              
122             sub mass_dequeue {
123 0     0 1 0 my ( $self, $target ) = @_;
124             confess("Can't mass_dequeue() without queue and class names.")
125             unless $target
126             and $target->{queue}
127 0 0 0     0 and $target->{class};
      0        
128              
129 0         0 my $queue = $self->key( queue => $target->{queue} );
130 0         0 my $removed = 0;
131 0         0 for my $item ( $self->redis->lrange( $queue, 0, -1 ) ) {
132 0         0 my $job_item = $self->new_job( $item );
133 0 0       0 if ( $job_item->class eq $target->{class} ) {
134 0 0       0 if ( exists $target->{args} ) {
135 0 0       0 next unless Compare( $job_item->args, $target->{args} );
136             }
137              
138 0         0 $removed += $self->redis->lrem( $queue, 0, $item );
139             }
140             }
141              
142 0         0 $removed;
143             }
144              
145             sub new_job {
146 0     0 1 0 my ( $self, $job ) = @_;
147              
148 0 0 0     0 if ( $job && ref $job && ref $job eq 'HASH' ) {
    0 0        
149 0         0 return $self->job_class->new({ resque => $self, %$job });
150             }
151             elsif ( $job ) {
152 0         0 return $self->job_class->new({ resque => $self, payload => $job });
153             }
154 0         0 confess "Can't build an empty Resque::Job object.";
155             }
156              
157             sub key {
158 2     2 1 5 my $self = shift;
159 2         63 join( ':', $self->namespace, @_ );
160             }
161              
162             sub keys {
163 0     0 1   my $self = shift;
164 0           my @keys = $self->redis->keys( $self->key('*') );
165 0 0         return wantarray ? @keys : \@keys;
166             }
167              
168             sub flush_namespace {
169 0     0 1   my $self = shift;
170 0 0         if ( my @keys = $self->keys ) {
171 0           return $self->redis->del( @keys );
172             }
173 0           return 0;
174             }
175              
176             sub list_range {
177 0     0 1   my ( $self, $key, $start, $count ) = @_;
178 0 0         my $stop = $count > 0 ? $start + $count - 1 : $count;
179 0           my @items = $self->redis->lrange( $key, $start, $stop );
180 0           return \@items;
181             }
182              
183             # Used internally to keep track of which queues we've created.
184             # Don't call this directly.
185             sub _watch_queue {
186 0     0     my ( $self, $queue ) = @_;
187 0           $self->redis->sadd( $self->key('queues'), $queue );
188             }
189              
190             __PACKAGE__->meta->make_immutable();
191              
192             __END__
193              
194             =pod
195              
196             =encoding UTF-8
197              
198             =head1 NAME
199              
200             Resque - Redis-backed library for creating background jobs, placing them on multiple queues, and processing them later.
201              
202             =head1 VERSION
203              
204             version 0.40
205              
206             =head1 SYNOPSIS
207              
208             First you create a Resque instance where you configure the L<Redis> backend and then you can
209             start sending jobs to be done by workers:
210              
211             use Resque;
212              
213             my $r = Resque->new( redis => '127.0.0.1:6379' );
214              
215             $r->push( my_queue => {
216             class => 'My::Task',
217             args => [ 'Hello world!' ]
218             });
219              
220             Background jobs can be any perl module that implement a perform() function. The Resque::Job object is
221             passed as the only argument to this function:
222              
223             package My::Task;
224             use strict;
225             use 5.10.0;
226              
227             sub perform {
228             my $job = shift;
229             say $job->args->[0];
230             }
231              
232             1;
233              
234             Finally, you run your jobs by instancing a L<Resque::Worker> and telling it to listen to one or more
235             queues:
236              
237             use Resque;
238              
239             my $w = Resque->new( redis => '127.0.0.1:6379' )->worker;
240             $w->add_queue('my_queue');
241             $w->work;
242              
243             =head1 DESCRIPTION
244              
245             Resque is a Redis-backed library for creating background jobs, placing them on multiple queues,
246             and processing them later.
247              
248             This library is a perl port of the original Ruby one: L<https://github.com/defunkt/resque>
249             My main goal doing this port is to use the same backend to be able to manage the system using
250             ruby's resque-server webapp.
251              
252             As extracted from the original docs, the main features of Resque are:
253              
254             Resque workers can be distributed between multiple machines, support priorities, are resilient to
255             memory leaks, tell you what they're doing, and expect failure.
256              
257             Resque queues are persistent; support constant time, atomic push and pop (thanks to Redis); provide
258             visibility into their contents; and store jobs as simple JSON hashes.
259              
260             The Resque frontend tells you what workers are doing, what workers are not doing, what queues you're
261             using, what's in those queues, provides general usage stats, and helps you track failures.
262              
263             A lot more about Resque can be read on the original blog post: L<http://github.com/blog/542-introducing-resque>
264              
265             =head1 ATTRIBUTES
266              
267             =head2 redis
268              
269             Redis instance for this Resque instance. Accepts a string, hash reference, L<Redis>, L<Redis::Fast> or any other object that behaves like those.
270              
271             When a string is passed in, it will be used as the server argument of a new client object. When L<Redis::Fast> is available this
272             will be used, when not the pure perl L<Redis> client will be used instead.
273              
274             =head2 namespace
275              
276             This is useful to run multiple queue systems with the same Redis backend.
277              
278             By default 'resque' is used.
279              
280             =head2 failures
281              
282             Failures handler. See L<Resque::Failures>.
283              
284             =head1 METHODS
285              
286             =head2 worker
287              
288             Returns a new L<Resque::Worker> on this resque instance.
289             It can have plugin/roles applied. See L<Resque::Pluggable>.
290              
291             my $worker = $r->worker();
292              
293             =head2 push
294              
295             Pushes a job onto a queue. Queue name should be a string and the
296             item should be a Resque::Job object or a hashref containing:
297              
298             class - The String name of the job class to run.
299             args - Any arrayref of arguments to pass the job.
300              
301             Returns redis response.
302              
303             Example
304              
305             $resque->push( archive => { class => 'Archive', args => [ 35, 'tar' ] } )
306              
307             =head2 pop
308              
309             Pops a job off a queue. Queue name should be a string.
310              
311             Returns a Resque::Job object.
312              
313             my $resque_job = $r->pop( 'queue_name' );
314              
315             =head2 blpop
316              
317             Pops a job off an arrayref of queues prioritizing by order. Queue names should be string.
318             It will block until a job is poped or the optional timeout in seconds.
319              
320             Returns a Resque::Job object.
321              
322             my $resque_job = $r->blpop( [qw/ queue1 queue2 queue3/], 60 );
323              
324             =head2 size
325              
326             Returns the size of a queue.
327             Queue name should be a string.
328              
329             my $size = $r->size();
330              
331             =head2 peek
332              
333             Returns an array of jobs currently queued, or an arrayref in scalar context.
334              
335             First argument is queue name and an optional secound and third are
336             start and count values that can be used for pagination.
337             start is the item to begin, count is how many items to return.
338              
339             Passing a negative count argument will set a stop value instead
340             of count. So, passing -1 will return full list, -2 all but last
341             element and so on.
342              
343             To get the 3rd page of a 30 item, paginatied list one would use:
344              
345             my @jobs = $resque->peek('my_queue', 59, 30)
346              
347             =head2 queues
348              
349             Returns an array of all known Resque queues, or an arrayref in scalar context.
350              
351             my @queues = $r->queues();
352              
353             =head2 remove_queue
354              
355             Given a queue name, completely deletes the queue.
356              
357             $r->remove_queue( 'my_queue' );
358              
359             =head2 create_queue
360              
361             Given a queue name, creates an empty queue.
362              
363             $r->create_queue( 'my_queue' );
364              
365             =head2 mass_dequeue
366              
367             Removes all matching jobs from a queue. Expects a hashref
368             with queue name, a class name, and, optionally, args.
369              
370             Returns the number of jobs destroyed.
371              
372             If no args are provided, it will remove all jobs of the class
373             provided.
374              
375             That is, for these two jobs:
376              
377             { 'class' => 'UpdateGraph', 'args' => ['perl'] }
378             { 'class' => 'UpdateGraph', 'args' => ['ruby'] }
379              
380             The following call will remove both:
381              
382             my $num_removed = $rescue->mass_dequeue({
383             queue => 'test',
384             class => 'UpdateGraph'
385             });
386              
387             Whereas specifying args will only remove the 2nd job:
388              
389             my $num_removed = $rescue->mass_dequeue({
390             queue => 'test',
391             class => 'UpdateGraph',
392             args => ['ruby']
393             });
394              
395             Using this method without args can be potentially very slow and
396             memory intensive, depending on the size of your queue, as it loads
397             all jobs into an array before processing.
398              
399             =head2 new_job
400              
401             Build a L<Resque::Job> object on this system for the given
402             hashref or string(payload for object).
403              
404             L<Resque::Job> class can be extended thru roles/plugins.
405             See L<Resque::Pluggable>.
406              
407             $r->new_job( $job_or_job_hashref );
408              
409             =head2 key
410              
411             Concatenate $self->namespace with the received array of names
412             to build a redis key name for this resque instance.
413              
414             =head2 keys
415              
416             Returns an array of all known Resque keys in Redis, or an arrayref in scalar context.
417             Redis' KEYS operation is O(N) for the keyspace, so be careful this can be slow for
418             big databases.
419              
420             =head2 flush_namespace
421              
422             This method will delete every trace of this Resque system on
423             the redis() backend.
424              
425             $r->flush_namespace();
426              
427             =head2 list_range
428              
429             Does the dirty work of fetching a range of items from a Redis list.
430              
431             my $items_ref = $r->list_range( $key, $stat, $count );
432              
433             =head1 Queue manipulation
434              
435             =head1 HELPER METHODS
436              
437             =head1 BUGS
438              
439             As in any piece of software there might be bugs around.
440             If you found one, please report it on RT or at the github repo:
441              
442             L<https://github.com/diegok/resque-perl>
443              
444             Pull requests are also very welcomed, but please include tests demostrating
445             what you've fixed.
446              
447             =head1 TODO
448              
449             =over 4
450              
451             =item *
452              
453             More tests on worker fork and signal handling.
454              
455             =back
456              
457             =head1 SEE ALSO
458              
459             =over 4
460              
461             =item *
462              
463             L<Gearman::Client>
464              
465             =item *
466              
467             L<TheSchwartz>
468              
469             =item *
470              
471             L<Queue::Q4M>
472              
473             =back
474              
475             =head1 AUTHOR
476              
477             Diego Kuperman <diego@freekeylabs.com>
478              
479             =head1 COPYRIGHT AND LICENSE
480              
481             This software is copyright (c) 2021 by Diego Kuperman.
482              
483             This is free software; you can redistribute it and/or modify it under
484             the same terms as the Perl 5 programming language system itself.
485              
486             =cut