File Coverage

blib/lib/Resque.pm
Criterion Covered Total %
statement 26 88 29.5
branch 0 30 0.0
condition 0 21 0.0
subroutine 9 25 36.0
pod 15 15 100.0
total 50 179 27.9


line stmt bran cond sub pod time code
1             package Resque;
2             # ABSTRACT: Redis-backed library for creating background jobs, placing them on multiple queues, and processing them later.
3             $Resque::VERSION = '0.41';
4 9     9   964670 use Moose;
  9         4430262  
  9         69  
5 9     9   69893 use Scalar::Util 'blessed';
  9         24  
  9         643  
6 9     9   69 use Moose::Util::TypeConstraints;
  9         16  
  9         108  
7 9     9   20649 use Class::Load;
  9         22  
  9         417  
8 9     9   6709 use Data::Compare;
  9         138587  
  9         3182  
9             with 'Resque::Pluggable';
10              
11 9     9   45554 use Resque::Job;
  9         158618  
  9         648  
12 9     9   6304 use Resque::Worker;
  9         4056  
  9         453  
13 9     9   5509 use Resque::Failures;
  9         3339  
  9         13012  
14              
15 0     0   0 sub _redis_class{ Class::Load::load_first_existing_class( 'Redis::Fast', 'Redis' ) }
16              
17              
18              
19             subtype 'Sugar::Redis' => as 'Object';
20             coerce 'Sugar::Redis' => from 'Str' => via {
21             _redis_class->new(
22             server => $_,
23             reconnect => 60,
24             every => 250,
25             encoding => undef
26             )
27             };
28              
29             coerce 'Sugar::Redis' => from 'HashRef' => via {
30             _redis_class->new((
31             reconnect => 60,
32             every => 250,
33             encoding => undef,
34             %$_,
35             ));
36             };
37              
38             has redis => (
39             is => 'ro',
40             lazy => 1,
41             coerce => 1,
42             isa => 'Sugar::Redis',
43             default => sub { _redis_class->new }
44             );
45              
46             has namespace => ( is => 'rw', default => sub { 'resque' } );
47              
48             has failures => (
49             is => 'rw',
50             lazy => 1,
51             default => sub { Resque::Failures->new( resque => $_[0] ) },
52             handles => [qw/ throw /]
53             );
54              
55             sub worker {
56 0     0 1 0 my $self = shift;
57 0         0 $self->worker_class->new( resque => $self );
58             }
59              
60             sub push {
61 0     0 1 0 my ( $self, $queue, $job ) = @_;
62 0 0       0 confess "Can't push an empty job." unless $job;
63 0         0 $self->_watch_queue($queue);
64 0 0 0     0 $job = $self->new_job($job) unless blessed $job && $job->isa('Resque::Job');
65 0         0 $self->redis->rpush( $self->key( queue => $queue ), $job->encode );
66             }
67              
68             sub pop {
69 0     0 1 0 my ( $self, $queue ) = @_;
70 0         0 my $payload = $self->redis->lpop($self->key( queue => $queue ));
71 0 0       0 return unless $payload;
72              
73 0         0 $self->new_job({
74             payload => $payload,
75             queue => $queue
76             });
77             }
78              
79             sub blpop {
80 0     0 1 0 my ( $self, $queues, $timeout ) = @_;
81 0   0     0 my ( $key, $payload ) = $self->redis->blpop(( map { $self->key( queue => $_ ) } @$queues ), $timeout || 0 );
  0         0  
82 0 0       0 return unless $payload;
83              
84             # clean prefix added by key().
85 0         0 my $prefix = $self->key('queue');
86 0         0 my ($queue) = $key =~ /^$prefix:(.+)$/;
87              
88 0         0 $self->new_job({
89             payload => $payload,
90             queue => $queue
91             });
92             }
93              
94             sub size {
95 0     0 1 0 my ( $self, $queue ) = @_;
96 0         0 $self->redis->llen( $self->key( queue => $queue ) );
97             }
98              
99             sub peek {
100 0     0 1 0 my ( $self, $queue, $start, $count ) = @_;
101 0   0     0 my $jobs = $self->list_range(
      0        
102             $self->key( queue => $queue ),
103             $start || 0, $count || 1
104             );
105 0         0 $_ = $self->new_job({ queue => $queue, payload => $_ }) for @$jobs;
106 0 0       0 return wantarray ? @$jobs : $jobs;
107             }
108              
109             sub queues {
110 0     0 1 0 my $self = shift;
111 0         0 my @queues = $self->redis->smembers( $self->key('queues') );
112 0 0       0 return wantarray ? @queues : \@queues;
113             }
114              
115             sub remove_queue {
116 0     0 1 0 my ( $self, $queue ) = @_;
117 0         0 $self->redis->srem( $self->key('queues'), $queue );
118 0         0 $self->redis->del( $self->key( queue => $queue ) );
119             }
120              
121             sub create_queue {
122 0     0 1 0 my ( $self, $queue ) = @_;
123 0         0 $self->_watch_queue( $queue );
124             }
125              
126             sub mass_dequeue {
127 0     0 1 0 my ( $self, $target ) = @_;
128             confess("Can't mass_dequeue() without queue and class names.")
129             unless $target
130             and $target->{queue}
131 0 0 0     0 and $target->{class};
      0        
132              
133 0         0 my $queue = $self->key( queue => $target->{queue} );
134 0         0 my $removed = 0;
135 0         0 for my $item ( $self->redis->lrange( $queue, 0, -1 ) ) {
136 0         0 my $job_item = $self->new_job( $item );
137 0 0       0 if ( $job_item->class eq $target->{class} ) {
138 0 0       0 if ( exists $target->{args} ) {
139 0 0       0 next unless Compare( $job_item->args, $target->{args} );
140             }
141              
142 0         0 $removed += $self->redis->lrem( $queue, 0, $item );
143             }
144             }
145              
146 0         0 $removed;
147             }
148              
149             sub new_job {
150 0     0 1 0 my ( $self, $job ) = @_;
151              
152 0 0 0     0 if ( $job && ref $job && ref $job eq 'HASH' ) {
    0 0        
153 0         0 return $self->job_class->new({ resque => $self, %$job });
154             }
155             elsif ( $job ) {
156 0         0 return $self->job_class->new({ resque => $self, payload => $job });
157             }
158 0         0 confess "Can't build an empty Resque::Job object.";
159             }
160              
161             sub key {
162 2     2 1 5 my $self = shift;
163 2         61 join( ':', $self->namespace, @_ );
164             }
165              
166             sub keys {
167 0     0 1   my $self = shift;
168 0           my @keys = $self->redis->keys( $self->key('*') );
169 0 0         return wantarray ? @keys : \@keys;
170             }
171              
172             sub flush_namespace {
173 0     0 1   my $self = shift;
174 0 0         if ( my @keys = $self->keys ) {
175 0           return $self->redis->del( @keys );
176             }
177 0           return 0;
178             }
179              
180             sub list_range {
181 0     0 1   my ( $self, $key, $start, $count ) = @_;
182 0 0         my $stop = $count > 0 ? $start + $count - 1 : $count;
183 0           my @items = $self->redis->lrange( $key, $start, $stop );
184 0           return \@items;
185             }
186              
187             # Used internally to keep track of which queues we've created.
188             # Don't call this directly.
189             sub _watch_queue {
190 0     0     my ( $self, $queue ) = @_;
191 0           $self->redis->sadd( $self->key('queues'), $queue );
192             }
193              
194             __PACKAGE__->meta->make_immutable();
195              
196             __END__
197              
198             =pod
199              
200             =encoding UTF-8
201              
202             =head1 NAME
203              
204             Resque - Redis-backed library for creating background jobs, placing them on multiple queues, and processing them later.
205              
206             =head1 VERSION
207              
208             version 0.41
209              
210             =head1 SYNOPSIS
211              
212             First you create a Resque instance where you configure the L<Redis> backend and then you can
213             start sending jobs to be done by workers:
214              
215             use Resque;
216              
217             my $r = Resque->new( redis => '127.0.0.1:6379' );
218              
219             $r->push( my_queue => {
220             class => 'My::Task',
221             args => [ 'Hello world!' ]
222             });
223              
224             Background jobs can be any perl module that implement a perform() function. The Resque::Job object is
225             passed as the only argument to this function:
226              
227             package My::Task;
228             use strict;
229             use 5.10.0;
230              
231             sub perform {
232             my $job = shift;
233             say $job->args->[0];
234             }
235              
236             1;
237              
238             Finally, you run your jobs by instancing a L<Resque::Worker> and telling it to listen to one or more
239             queues:
240              
241             use Resque;
242              
243             my $w = Resque->new( redis => '127.0.0.1:6379' )->worker;
244             $w->add_queue('my_queue');
245             $w->work;
246              
247             =head1 DESCRIPTION
248              
249             Resque is a Redis-backed library for creating background jobs, placing them on multiple queues,
250             and processing them later.
251              
252             This library is a perl port of the original Ruby one: L<https://github.com/defunkt/resque>
253             My main goal doing this port is to use the same backend to be able to manage the system using
254             ruby's resque-server webapp.
255              
256             As extracted from the original docs, the main features of Resque are:
257              
258             Resque workers can be distributed between multiple machines, support priorities, are resilient to
259             memory leaks, tell you what they're doing, and expect failure.
260              
261             Resque queues are persistent; support constant time, atomic push and pop (thanks to Redis); provide
262             visibility into their contents; and store jobs as simple JSON hashes.
263              
264             The Resque frontend tells you what workers are doing, what workers are not doing, what queues you're
265             using, what's in those queues, provides general usage stats, and helps you track failures.
266              
267             A lot more about Resque can be read on the original blog post: L<http://github.com/blog/542-introducing-resque>
268              
269             =head1 ATTRIBUTES
270              
271             =head2 redis
272              
273             Redis instance for this Resque instance. Accepts a string, hash reference, L<Redis>, L<Redis::Fast> or any other object that behaves like those.
274              
275             When a string is passed in, it will be used as the server argument of a new client object. When L<Redis::Fast> is available this
276             will be used, when not the pure perl L<Redis> client will be used instead.
277              
278             =head2 namespace
279              
280             This is useful to run multiple queue systems with the same Redis backend.
281              
282             By default 'resque' is used.
283              
284             =head2 failures
285              
286             Failures handler. See L<Resque::Failures>.
287              
288             =head1 METHODS
289              
290             =head2 worker
291              
292             Returns a new L<Resque::Worker> on this resque instance.
293             It can have plugin/roles applied. See L<Resque::Pluggable>.
294              
295             my $worker = $r->worker();
296              
297             =head2 push
298              
299             Pushes a job onto a queue. Queue name should be a string and the
300             item should be a Resque::Job object or a hashref containing:
301              
302             class - The String name of the job class to run.
303             args - Any arrayref of arguments to pass the job.
304              
305             Returns redis response.
306              
307             Example
308              
309             $resque->push( archive => { class => 'Archive', args => [ 35, 'tar' ] } )
310              
311             =head2 pop
312              
313             Pops a job off a queue. Queue name should be a string.
314              
315             Returns a Resque::Job object.
316              
317             my $resque_job = $r->pop( 'queue_name' );
318              
319             =head2 blpop
320              
321             Pops a job off an arrayref of queues prioritizing by order. Queue names should be string.
322             It will block until a job is poped or the optional timeout in seconds.
323              
324             Returns a Resque::Job object.
325              
326             my $resque_job = $r->blpop( [qw/ queue1 queue2 queue3/], 60 );
327              
328             =head2 size
329              
330             Returns the size of a queue.
331             Queue name should be a string.
332              
333             my $size = $r->size();
334              
335             =head2 peek
336              
337             Returns an array of jobs currently queued, or an arrayref in scalar context.
338              
339             First argument is queue name and an optional secound and third are
340             start and count values that can be used for pagination.
341             start is the item to begin, count is how many items to return.
342              
343             Passing a negative count argument will set a stop value instead
344             of count. So, passing -1 will return full list, -2 all but last
345             element and so on.
346              
347             To get the 3rd page of a 30 item, paginatied list one would use:
348              
349             my @jobs = $resque->peek('my_queue', 59, 30)
350              
351             =head2 queues
352              
353             Returns an array of all known Resque queues, or an arrayref in scalar context.
354              
355             my @queues = $r->queues();
356              
357             =head2 remove_queue
358              
359             Given a queue name, completely deletes the queue.
360              
361             $r->remove_queue( 'my_queue' );
362              
363             =head2 create_queue
364              
365             Given a queue name, creates an empty queue.
366              
367             $r->create_queue( 'my_queue' );
368              
369             =head2 mass_dequeue
370              
371             Removes all matching jobs from a queue. Expects a hashref
372             with queue name, a class name, and, optionally, args.
373              
374             Returns the number of jobs destroyed.
375              
376             If no args are provided, it will remove all jobs of the class
377             provided.
378              
379             That is, for these two jobs:
380              
381             { 'class' => 'UpdateGraph', 'args' => ['perl'] }
382             { 'class' => 'UpdateGraph', 'args' => ['ruby'] }
383              
384             The following call will remove both:
385              
386             my $num_removed = $rescue->mass_dequeue({
387             queue => 'test',
388             class => 'UpdateGraph'
389             });
390              
391             Whereas specifying args will only remove the 2nd job:
392              
393             my $num_removed = $rescue->mass_dequeue({
394             queue => 'test',
395             class => 'UpdateGraph',
396             args => ['ruby']
397             });
398              
399             Using this method without args can be potentially very slow and
400             memory intensive, depending on the size of your queue, as it loads
401             all jobs into an array before processing.
402              
403             =head2 new_job
404              
405             Build a L<Resque::Job> object on this system for the given
406             hashref or string(payload for object).
407              
408             L<Resque::Job> class can be extended thru roles/plugins.
409             See L<Resque::Pluggable>.
410              
411             $r->new_job( $job_or_job_hashref );
412              
413             =head2 key
414              
415             Concatenate $self->namespace with the received array of names
416             to build a redis key name for this resque instance.
417              
418             =head2 keys
419              
420             Returns an array of all known Resque keys in Redis, or an arrayref in scalar context.
421             Redis' KEYS operation is O(N) for the keyspace, so be careful this can be slow for
422             big databases.
423              
424             =head2 flush_namespace
425              
426             This method will delete every trace of this Resque system on
427             the redis() backend.
428              
429             $r->flush_namespace();
430              
431             =head2 list_range
432              
433             Does the dirty work of fetching a range of items from a Redis list.
434              
435             my $items_ref = $r->list_range( $key, $stat, $count );
436              
437             =head1 Queue manipulation
438              
439             =head1 HELPER METHODS
440              
441             =head1 BUGS
442              
443             As in any piece of software there might be bugs around.
444             If you found one, please report it on RT or at the github repo:
445              
446             L<https://github.com/diegok/resque-perl>
447              
448             Pull requests are also very welcomed, but please include tests demostrating
449             what you've fixed.
450              
451             =head1 TODO
452              
453             =over 4
454              
455             =item *
456              
457             More tests on worker fork and signal handling.
458              
459             =back
460              
461             =head1 SEE ALSO
462              
463             =over 4
464              
465             =item *
466              
467             L<Gearman::Client>
468              
469             =item *
470              
471             L<TheSchwartz>
472              
473             =item *
474              
475             L<Queue::Q4M>
476              
477             =back
478              
479             =head1 AUTHOR
480              
481             Diego Kuperman <diego@freekeylabs.com>
482              
483             =head1 COPYRIGHT AND LICENSE
484              
485             This software is copyright (c) 2021 by Diego Kuperman.
486              
487             This is free software; you can redistribute it and/or modify it under
488             the same terms as the Perl 5 programming language system itself.
489              
490             =cut