File Coverage

blib/lib/Redis/Queue.pm
Criterion Covered Total %
statement 6 90 6.6
branch 0 30 0.0
condition 0 7 0.0
subroutine 2 12 16.6
pod 8 8 100.0
total 16 147 10.8


line stmt bran cond sub pod time code
1             require 5.008_001;
2              
3             package Redis::Queue;
4              
5 3     3   204497 use warnings;
  3         6  
  3         108  
6 3     3   16 use strict;
  3         6  
  3         4411  
7              
8             =head1 NAME
9              
10             Redis::Queue - Simple work queue using Redis
11              
12             =head1 VERSION
13              
14             Version 0.01
15              
16             =cut
17              
18             our $VERSION = '0.01';
19              
20             =head1 SYNOPSIS
21              
22             Simple work queue using Redis, tries not to lose things when processes die.
23              
24             Worker should call receiveMessage to get a unit of work, and deleteMessage once
25             the work is completed. If the message isn't deleted within a given timeout,
26             other workers can retrieve the message again.
27              
28             The queue object should be kept around for a while, because of the 'id' state
29             that it keeps when generating new entries. If you have concerns about the
30             redis connection dropping, pass a constructor as the $redis parameter isntead
31             of a connection.
32              
33             use Redis::Queue;
34              
35             my $foo = Redis::Queue->new();
36             ...
37              
38             =head1 CONSTRUCTOR
39              
40             =head2 new
41              
42             Required parameters:
43             redis => handle to Redis || coderef to generate a handle to Redis
44             queue => name for queue
45             Optional parameters:
46             timeout => length of time (in seconds) to treat received messages as reserved
47              
48             =cut
49              
50             sub new {
51 0     0 1   my $class = shift;
52 0           my $self = {@_};
53              
54 0 0         $class = ref($class) if ref($class);
55 0           bless($self, $class);
56              
57 0 0 0       if ($self->{redis} and ref $self->{redis} eq 'CODE') {
58 0           $self->{redis_constructor} = delete $self->{redis};
59 0           $self->{redis} = $self->{redis_constructor}->();
60             }
61              
62 0 0         $self->{redis} or die "Missing handle to redis\n";
63 0 0         $self->{queue} or die "Missing name for queue\n";
64 0   0       $self->{timeout} ||= 300;
65 0           return $self;
66             }
67              
68             =head1 THREADSAFE METHODS
69              
70             Atomic thread-safe methods.
71              
72             =head2 sendMessage
73              
74             Put a message on the queue.
75             Returns the generated message id.
76              
77             =cut
78             sub sendMessage {
79 0     0 1   my $self = shift;
80 0           my $message = shift;
81              
82 0           my $base = $self->_queue_base($self->{queue});
83              
84             # used for making multiple sends in a second unique
85 0           our $unique;
86 0           my $id = ++$unique;
87 0           my $key = join('.', time(), $$, $id);
88              
89 0           $self->_call_redis('set', "$base:value:$key", $message);
90 0           $self->_call_redis('set', "$base:fetched:$key", 0);
91 0           $self->_call_redis('lpush',"$base:primary", $key);
92 0           return $key;
93             }
94              
95             =head2 receiveMessage
96              
97             Get a message from the queue.
98             Returns (id,value). You must use the id to delete the message when done.
99              
100             =cut
101             sub receiveMessage {
102 0     0 1   my $self = shift;
103              
104 0           my $base = $self->_queue_base($self->{queue});
105 0           my $threshold = time() - $self->{timeout};
106              
107             # Find out (approximately) how long the list is.
108             # Sure, it could change while we're walking the list,
109             # but this is just to keep us from walking forever.
110 0           my $count = $self->_call_redis('llen', "$base:primary");
111 0           while ($count--) {
112             # Iterate through all the keys.
113             # It doesn't matter if we miss a couple because other workers are grabbing them...
114             # that just means that somebody else will do the work.
115 0           my $key = $self->_call_redis('rpoplpush', "$base:primary", "$base:primary");
116              
117             # Quit if there aren't any keys left.
118 0 0         return unless $key;
119              
120             # Check the timestamp, to make sure nobody else is processing the message.
121 0           my $now = time();
122 0           my $fetched = $self->_call_redis('getset', "$base:fetched:$key", $now);
123 0 0         if ($fetched < $threshold) {
124 0           my $message = $self->_call_redis('get', "$base:value:$key");
125 0           return ($key, $message);
126             }
127              
128             # Restore the original fetched timestamp (if different from what we put in).
129             # The conditional is important if there's a bunch of workers hammering the queue.
130 0 0         $self->_call_redis('set', "$base:fetched:$key", $fetched) if $fetched < $now;
131             }
132              
133             # Didn't find anything workable in the queue. Oh, well.
134 0           return;
135             }
136              
137             =head2 deleteMessage
138              
139             Delete a message from the queue by id.
140              
141             =cut
142             sub deleteMessage {
143 0     0 1   my $self = shift;
144 0           my $key = shift;
145              
146 0           my $base = $self->_queue_base($self->{queue});
147 0           $self->_call_redis('lrem', "$base:primary", 0, $key);
148 0           $self->_call_redis('del', "$base:fetched:$key");
149 0           $self->_call_redis('del', "$base:value:$key");
150             }
151              
152             =head1 NON-THREADSAFE METHODS
153              
154             These methods return results that may not accurately represent the state of
155             the queue by the time you read their results.
156              
157             =head2 length
158              
159             Get the length of the queue. It may have changed by the time you read it
160             but it's good for a general idea of how big the queue is.
161              
162             =cut
163             sub length {
164 0     0 1   my $self = shift;
165              
166 0           my $base = $self->_queue_base($self->{queue});
167              
168 0           return $self->_call_redis('llen', "$base:primary");
169             }
170              
171             =head2 nuke
172              
173             Delete all storage associated with the queue. Messy things may happen if
174             something else is trying to use the queue at the same time this runs. On the
175             other hand, it shouldn't be fatal, but still leaves the the possibility of
176             leaving some stuff behind.
177              
178             =cut
179             sub nuke {
180 0     0 1   my $self = shift;
181              
182 0           my $base = $self->_queue_base($self->{queue});
183              
184 0           my @keys = $self->_call_redis('keys', "$base:*");
185              
186             # Do the primary first, to try to avoid issues if someone uses/recreates the queue while we're nuking it.
187 0           $self->_call_redis('del', "$base:primary");
188             # Nuke everything other than the primary.
189             # May still miss some entries if stuff was added between the keys listing and the nuking of the primary...
190 0           for my $key (grep($_ ne "$base:primary", @keys)) {
191 0           $self->_call_redis('del', $key);
192             }
193             }
194              
195             =head2 peekMessages
196              
197             Peek at some number of messages on the queue (defaults to 10). In particular,
198             if there are workers deleting entries, this may return fewer entries than
199             requested, even if there are more messages on the queue.
200              
201             =cut
202             sub peekMessages {
203 0     0 1   my $self = shift;
204 0   0       my $max = shift || 10;
205              
206 0           my $base = $self->_queue_base($self->{queue});
207              
208 0           my @result;
209 0           my @keys = $self->_call_redis('lrange', "$base:primary", 0, $max - 1);
210 0           for my $key (@keys) {
211 0           my $message = $self->_call_redis('get', "$base:value:$key");
212 0 0         push(@result, $message) if $message;
213             }
214 0           return @result;
215             }
216              
217             =head2 queues
218              
219             Get the list of queues hosted on the redis server.
220              
221             =cut
222             sub queues {
223 0     0 1   my $redis = shift;
224 0 0         $redis = shift if $redis eq 'Redis::Queue';
225 0 0         $redis = $redis->{redis} if ref($redis) eq 'Redis::Queue';
226              
227 0           my @queues;
228 0 0         if (@_) {
229 0           for my $pattern (@_) {
230 0           push(@queues, $redis->keys("queue:$pattern:primary"));
231             }
232             }
233             else {
234 0           push(@queues, $redis->keys("queue:*:primary"));
235             }
236              
237             s/queue:(.*):primary/$1/
238 0           for @queues;
239              
240 0           return @queues;
241             }
242              
243             =head1 PRIVATE METHODS
244              
245             Documentation here provided for developer reference.
246              
247             =head2 _queue_base
248              
249             Accessor method for the queue key-name prefix
250              
251             =cut
252             sub _queue_base {
253 0     0     my ($self, $queue) = @_;
254 0           return "queue:$queue";
255             }
256              
257             =head2 _call_redis
258              
259             Send a request to Redis
260              
261             =cut
262             sub _call_redis {
263 0     0     my ($self, $method, @args) = @_;
264              
265 0           my @return;
266 0           for (1..3) {
267 0           @return = eval {
268 0           return $self->{redis}->$method(@args);
269             };
270 0 0         last unless $@;
271              
272 0           warn "Error while calling redis: $@";
273              
274 0 0         if ($_ < 3) {
275 0 0         if ($self->{redis_constructor}) {
276 0           $self->{redis} = $self->{redis_constructor}->();
277             }
278             else {
279 0           die "No constructor, can't reconnect to redis.\n";
280             }
281             } else {
282 0           die "ETOOMANYERRORS\n";
283             }
284             }
285              
286 0 0         return wantarray ? @return : $return[0];
287             }
288              
289             =head1 AUTHOR
290              
291             Alex Popiel, C<< >>
292              
293             =head1 BUGS
294              
295             Please report any bugs or feature requests to C, or through
296             the web interface at L. I will be notified, and then you'll
297             automatically be notified of progress on your bug as I make changes.
298              
299             =head1 SUPPORT
300              
301             You can find documentation for this module with the perldoc command.
302              
303             perldoc Redis::Queue
304              
305             You can also look for information at:
306              
307             =over 4
308              
309             =item * GitHub
310              
311             L
312              
313             =item * AnnoCPAN: Annotated CPAN documentation
314              
315             =back
316              
317             =head1 ACKNOWLEDGEMENTS
318              
319             Thank you to Marchex L for allowing time to be spent
320             developing and maintaining this library.
321             Thanks also to Chris Petersen for major assistance in packaging of this library.
322              
323             =head1 LICENSE AND COPYRIGHT
324              
325             Copyright 2011 Alex Popiel.
326              
327             This program is free software; you can redistribute it and/or modify it
328             under the terms of the Artistic License version 2.0.
329              
330             See http://www.perlfoundation.org/artistic_license_2_0 for more information.
331              
332             =cut
333              
334             1; # End of Redis::Queue