File Coverage

blib/lib/Resque.pm
Criterion Covered Total %
statement 26 95 27.3
branch 0 30 0.0
condition 0 21 0.0
subroutine 9 27 33.3
pod 16 16 100.0
total 51 189 26.9


line stmt bran cond sub pod time code
1             package Resque;
2             # ABSTRACT: Redis-backed library for creating background jobs, placing them on multiple queues, and processing them later.
3             $Resque::VERSION = '0.42';
4 9     9   964046 use Moose;
  9         4314137  
  9         65  
5 9     9   66472 use Scalar::Util 'blessed';
  9         22  
  9         580  
6 9     9   60 use Moose::Util::TypeConstraints;
  9         15  
  9         109  
7 9     9   19573 use Class::Load;
  9         18  
  9         397  
8 9     9   6776 use Data::Compare;
  9         133434  
  9         3093  
9             with 'Resque::Pluggable';
10              
11 9     9   43819 use Resque::Job;
  9         149965  
  9         524  
12 9     9   6068 use Resque::Worker;
  9         4111  
  9         420  
13 9     9   5206 use Resque::Failures;
  9         3187  
  9         12812  
14              
15 0     0   0 sub _redis_class{ Class::Load::load_first_existing_class( 'Redis::Fast', 'Redis' ) }
16              
17              
18              
19             subtype 'Sugar::Redis' => as 'Object';
20             coerce 'Sugar::Redis' => from 'Str' => via {
21             _redis_class->new(
22             server => $_,
23             reconnect => 60,
24             every => 250,
25             encoding => undef
26             )
27             };
28              
29             coerce 'Sugar::Redis' => from 'HashRef' => via {
30             _redis_class->new((
31             reconnect => 60,
32             every => 250,
33             encoding => undef,
34             %$_,
35             ));
36             };
37              
38             has redis => (
39             is => 'ro',
40             lazy => 1,
41             coerce => 1,
42             isa => 'Sugar::Redis',
43             default => sub { _redis_class->new }
44             );
45              
46             has namespace => ( is => 'rw', default => sub { 'resque' } );
47              
48             has failures => (
49             is => 'rw',
50             lazy => 1,
51             default => sub { Resque::Failures->new( resque => $_[0] ) },
52             handles => [qw/ throw /]
53             );
54              
55             sub worker {
56 0     0 1 0 my $self = shift;
57 0         0 $self->worker_class->new( resque => $self );
58             }
59              
60             sub push {
61 0     0 1 0 my ( $self, $queue, $job ) = @_;
62 0 0       0 confess "Can't push an empty job." unless $job;
63 0         0 $self->_watch_queue($queue);
64 0 0 0     0 $job = $self->new_job($job) unless blessed $job && $job->isa('Resque::Job');
65 0         0 $self->redis->rpush( $self->key( queue => $queue ), $job->encode );
66             }
67              
68             sub pop {
69 0     0 1 0 my ( $self, $queue ) = @_;
70 0         0 my $payload = $self->redis->lpop($self->key( queue => $queue ));
71 0 0       0 return unless $payload;
72              
73 0         0 $self->new_job({
74             payload => $payload,
75             queue => $queue
76             });
77             }
78              
79             sub blpop {
80 0     0 1 0 my ( $self, $queues, $timeout ) = @_;
81 0   0     0 my ( $key, $payload ) = $self->redis->blpop(( map { $self->key( queue => $_ ) } @$queues ), $timeout || 0 );
  0         0  
82 0 0       0 return unless $payload;
83              
84             # clean prefix added by key().
85 0         0 my $prefix = $self->key('queue');
86 0         0 my ($queue) = $key =~ /^$prefix:(.+)$/;
87              
88 0         0 $self->new_job({
89             payload => $payload,
90             queue => $queue
91             });
92             }
93              
94             sub size {
95 0     0 1 0 my ( $self, $queue ) = @_;
96 0         0 $self->redis->llen( $self->key( queue => $queue ) );
97             }
98              
99             sub size_map {
100 0     0 1 0 my ( $self, $queues ) = @_;
101 0         0 my $res = {};
102 0         0 for my $q (@$queues) {
103 0     0   0 $self->redis->llen( $self->key( queue => $q ), sub{ $res->{$q} = $_[0] } )
104 0         0 }
105 0         0 $self->redis->wait_all_responses;
106 0         0 $res;
107             }
108              
109             sub peek {
110 0     0 1 0 my ( $self, $queue, $start, $count ) = @_;
111 0   0     0 my $jobs = $self->list_range(
      0        
112             $self->key( queue => $queue ),
113             $start || 0, $count || 1
114             );
115 0         0 $_ = $self->new_job({ queue => $queue, payload => $_ }) for @$jobs;
116 0 0       0 return wantarray ? @$jobs : $jobs;
117             }
118              
119             sub queues {
120 0     0 1 0 my $self = shift;
121 0         0 my @queues = $self->redis->smembers( $self->key('queues') );
122 0 0       0 return wantarray ? @queues : \@queues;
123             }
124              
125             sub remove_queue {
126 0     0 1 0 my ( $self, $queue ) = @_;
127 0         0 $self->redis->srem( $self->key('queues'), $queue );
128 0         0 $self->redis->del( $self->key( queue => $queue ) );
129             }
130              
131             sub create_queue {
132 0     0 1 0 my ( $self, $queue ) = @_;
133 0         0 $self->_watch_queue( $queue );
134             }
135              
136             sub mass_dequeue {
137 0     0 1 0 my ( $self, $target ) = @_;
138             confess("Can't mass_dequeue() without queue and class names.")
139             unless $target
140             and $target->{queue}
141 0 0 0     0 and $target->{class};
      0        
142              
143 0         0 my $queue = $self->key( queue => $target->{queue} );
144 0         0 my $removed = 0;
145 0         0 for my $item ( $self->redis->lrange( $queue, 0, -1 ) ) {
146 0         0 my $job_item = $self->new_job( $item );
147 0 0       0 if ( $job_item->class eq $target->{class} ) {
148 0 0       0 if ( exists $target->{args} ) {
149 0 0       0 next unless Compare( $job_item->args, $target->{args} );
150             }
151              
152 0         0 $removed += $self->redis->lrem( $queue, 0, $item );
153             }
154             }
155              
156 0         0 $removed;
157             }
158              
159             sub new_job {
160 0     0 1 0 my ( $self, $job ) = @_;
161              
162 0 0 0     0 if ( $job && ref $job && ref $job eq 'HASH' ) {
    0 0        
163 0         0 return $self->job_class->new({ resque => $self, %$job });
164             }
165             elsif ( $job ) {
166 0         0 return $self->job_class->new({ resque => $self, payload => $job });
167             }
168 0         0 confess "Can't build an empty Resque::Job object.";
169             }
170              
171             sub key {
172 2     2 1 6 my $self = shift;
173 2         49 join( ':', $self->namespace, @_ );
174             }
175              
176             sub keys {
177 0     0 1   my $self = shift;
178 0           my @keys = $self->redis->keys( $self->key('*') );
179 0 0         return wantarray ? @keys : \@keys;
180             }
181              
182             sub flush_namespace {
183 0     0 1   my $self = shift;
184 0 0         if ( my @keys = $self->keys ) {
185 0           return $self->redis->del( @keys );
186             }
187 0           return 0;
188             }
189              
190             sub list_range {
191 0     0 1   my ( $self, $key, $start, $count ) = @_;
192 0 0         my $stop = $count > 0 ? $start + $count - 1 : $count;
193 0           my @items = $self->redis->lrange( $key, $start, $stop );
194 0           return \@items;
195             }
196              
197             # Used internally to keep track of which queues we've created.
198             # Don't call this directly.
199             sub _watch_queue {
200 0     0     my ( $self, $queue ) = @_;
201 0           $self->redis->sadd( $self->key('queues'), $queue );
202             }
203              
204             __PACKAGE__->meta->make_immutable();
205              
206             __END__
207              
208             =pod
209              
210             =encoding UTF-8
211              
212             =head1 NAME
213              
214             Resque - Redis-backed library for creating background jobs, placing them on multiple queues, and processing them later.
215              
216             =head1 VERSION
217              
218             version 0.42
219              
220             =head1 SYNOPSIS
221              
222             First you create a Resque instance where you configure the L<Redis> backend and then you can
223             start sending jobs to be done by workers:
224              
225             use Resque;
226              
227             my $r = Resque->new( redis => '127.0.0.1:6379' );
228              
229             $r->push( my_queue => {
230             class => 'My::Task',
231             args => [ 'Hello world!' ]
232             });
233              
234             Background jobs can be any perl module that implement a perform() function. The Resque::Job object is
235             passed as the only argument to this function:
236              
237             package My::Task;
238             use strict;
239             use 5.10.0;
240              
241             sub perform {
242             my $job = shift;
243             say $job->args->[0];
244             }
245              
246             1;
247              
248             Finally, you run your jobs by instancing a L<Resque::Worker> and telling it to listen to one or more
249             queues:
250              
251             use Resque;
252              
253             my $w = Resque->new( redis => '127.0.0.1:6379' )->worker;
254             $w->add_queue('my_queue');
255             $w->work;
256              
257             =head1 DESCRIPTION
258              
259             Resque is a Redis-backed library for creating background jobs, placing them on multiple queues,
260             and processing them later.
261              
262             This library is a perl port of the original Ruby one: L<https://github.com/defunkt/resque>
263             My main goal doing this port is to use the same backend to be able to manage the system using
264             ruby's resque-server webapp.
265              
266             As extracted from the original docs, the main features of Resque are:
267              
268             Resque workers can be distributed between multiple machines, support priorities, are resilient to
269             memory leaks, tell you what they're doing, and expect failure.
270              
271             Resque queues are persistent; support constant time, atomic push and pop (thanks to Redis); provide
272             visibility into their contents; and store jobs as simple JSON hashes.
273              
274             The Resque frontend tells you what workers are doing, what workers are not doing, what queues you're
275             using, what's in those queues, provides general usage stats, and helps you track failures.
276              
277             A lot more about Resque can be read on the original blog post: L<http://github.com/blog/542-introducing-resque>
278              
279             =head1 ATTRIBUTES
280              
281             =head2 redis
282              
283             Redis instance for this Resque instance. Accepts a string, hash reference, L<Redis>, L<Redis::Fast> or any other object that behaves like those.
284              
285             When a string is passed in, it will be used as the server argument of a new client object. When L<Redis::Fast> is available this
286             will be used, when not the pure perl L<Redis> client will be used instead.
287              
288             =head2 namespace
289              
290             This is useful to run multiple queue systems with the same Redis backend.
291              
292             By default 'resque' is used.
293              
294             =head2 failures
295              
296             Failures handler. See L<Resque::Failures>.
297              
298             =head1 METHODS
299              
300             =head2 worker
301              
302             Returns a new L<Resque::Worker> on this resque instance.
303             It can have plugin/roles applied. See L<Resque::Pluggable>.
304              
305             my $worker = $r->worker();
306              
307             =head2 push
308              
309             Pushes a job onto a queue. Queue name should be a string and the
310             item should be a Resque::Job object or a hashref containing:
311              
312             class - The String name of the job class to run.
313             args - Any arrayref of arguments to pass the job.
314              
315             Returns redis response.
316              
317             Example
318              
319             $resque->push( archive => { class => 'Archive', args => [ 35, 'tar' ] } )
320              
321             =head2 pop
322              
323             Pops a job off a queue. Queue name should be a string.
324              
325             Returns a Resque::Job object.
326              
327             my $resque_job = $r->pop( 'queue_name' );
328              
329             =head2 blpop
330              
331             Pops a job off an arrayref of queues prioritizing by order. Queue names should be string.
332             It will block until a job is poped or the optional timeout in seconds.
333              
334             Returns a Resque::Job object.
335              
336             my $resque_job = $r->blpop( [qw/ queue1 queue2 queue3/], 60 );
337              
338             =head2 size
339              
340             Returns the size of a queue.
341             Queue name should be a string.
342              
343             my $size = $r->size();
344              
345             =head2 size_map
346              
347             Returns a hashref with the size of an arrayref of queues.
348             Queue names should be strings.
349              
350             my $sizes = $r->size_map([qw/ queue1 queue2 queue3 /]);
351              
352             This method is very fast as it will pipeline all operations.
353              
354             =head2 peek
355              
356             Returns an array of jobs currently queued, or an arrayref in scalar context.
357              
358             First argument is queue name and an optional secound and third are
359             start and count values that can be used for pagination.
360             start is the item to begin, count is how many items to return.
361              
362             Passing a negative count argument will set a stop value instead
363             of count. So, passing -1 will return full list, -2 all but last
364             element and so on.
365              
366             To get the 3rd page of a 30 item, paginatied list one would use:
367              
368             my @jobs = $resque->peek('my_queue', 59, 30)
369              
370             =head2 queues
371              
372             Returns an array of all known Resque queues, or an arrayref in scalar context.
373              
374             my @queues = $r->queues();
375              
376             =head2 remove_queue
377              
378             Given a queue name, completely deletes the queue.
379              
380             $r->remove_queue( 'my_queue' );
381              
382             =head2 create_queue
383              
384             Given a queue name, creates an empty queue.
385              
386             $r->create_queue( 'my_queue' );
387              
388             =head2 mass_dequeue
389              
390             Removes all matching jobs from a queue. Expects a hashref
391             with queue name, a class name, and, optionally, args.
392              
393             Returns the number of jobs destroyed.
394              
395             If no args are provided, it will remove all jobs of the class
396             provided.
397              
398             That is, for these two jobs:
399              
400             { 'class' => 'UpdateGraph', 'args' => ['perl'] }
401             { 'class' => 'UpdateGraph', 'args' => ['ruby'] }
402              
403             The following call will remove both:
404              
405             my $num_removed = $rescue->mass_dequeue({
406             queue => 'test',
407             class => 'UpdateGraph'
408             });
409              
410             Whereas specifying args will only remove the 2nd job:
411              
412             my $num_removed = $rescue->mass_dequeue({
413             queue => 'test',
414             class => 'UpdateGraph',
415             args => ['ruby']
416             });
417              
418             Using this method without args can be potentially very slow and
419             memory intensive, depending on the size of your queue, as it loads
420             all jobs into an array before processing.
421              
422             =head2 new_job
423              
424             Build a L<Resque::Job> object on this system for the given
425             hashref or string(payload for object).
426              
427             L<Resque::Job> class can be extended thru roles/plugins.
428             See L<Resque::Pluggable>.
429              
430             $r->new_job( $job_or_job_hashref );
431              
432             =head2 key
433              
434             Concatenate $self->namespace with the received array of names
435             to build a redis key name for this resque instance.
436              
437             =head2 keys
438              
439             Returns an array of all known Resque keys in Redis, or an arrayref in scalar context.
440             Redis' KEYS operation is O(N) for the keyspace, so be careful this can be slow for
441             big databases.
442              
443             =head2 flush_namespace
444              
445             This method will delete every trace of this Resque system on
446             the redis() backend.
447              
448             $r->flush_namespace();
449              
450             =head2 list_range
451              
452             Does the dirty work of fetching a range of items from a Redis list.
453              
454             my $items_ref = $r->list_range( $key, $stat, $count );
455              
456             =head1 Queue manipulation
457              
458             =head1 HELPER METHODS
459              
460             =head1 BUGS
461              
462             As in any piece of software there might be bugs around.
463             If you found one, please report it on RT or at the github repo:
464              
465             L<https://github.com/diegok/resque-perl>
466              
467             Pull requests are also very welcomed, but please include tests demostrating
468             what you've fixed.
469              
470             =head1 TODO
471              
472             =over 4
473              
474             =item *
475              
476             More tests on worker fork and signal handling.
477              
478             =back
479              
480             =head1 SEE ALSO
481              
482             =over 4
483              
484             =item *
485              
486             L<Gearman::Client>
487              
488             =item *
489              
490             L<TheSchwartz>
491              
492             =item *
493              
494             L<Queue::Q4M>
495              
496             =back
497              
498             =head1 AUTHOR
499              
500             Diego Kuperman <diego@freekeylabs.com>
501              
502             =head1 COPYRIGHT AND LICENSE
503              
504             This software is copyright (c) 2021 by Diego Kuperman.
505              
506             This is free software; you can redistribute it and/or modify it under
507             the same terms as the Perl 5 programming language system itself.
508              
509             =cut