File Coverage

blib/lib/Queue/Q/ReliableFIFO/Redis.pm
Criterion Covered Total %
statement 33 422 7.8
branch 0 168 0.0
condition 0 112 0.0
subroutine 11 47 23.4
pod 23 24 95.8
total 67 773 8.6


line stmt bran cond sub pod time code
1             package Queue::Q::ReliableFIFO::Redis;
2 1     1   86196 use strict;
  1         2  
  1         28  
3 1     1   7 use warnings;
  1         1  
  1         29  
4 1     1   5 use Carp qw(croak cluck);
  1         2  
  1         53  
5              
6 1     1   665 use parent 'Queue::Q::ReliableFIFO';
  1         279  
  1         5  
7 1     1   624 use Queue::Q::ReliableFIFO::Item;
  1         3  
  1         29  
8 1     1   622 use Queue::Q::ReliableFIFO::Lua;
  1         3  
  1         36  
9 1     1   9 use Redis;
  1         2  
  1         26  
10 1     1   5 use Time::HiRes qw(usleep);
  1         2  
  1         8  
11              
12             use Class::XSAccessor {
13 1         10 getters => [qw(
14             server
15             port
16             db
17             queue_name
18             busy_expiry_time
19             claim_wait_timeout
20             requeue_limit
21             redis_conn
22             redis_options
23             warn_on_requeue
24             _main_queue
25             _busy_queue
26             _failed_queue
27             _time_queue
28             _temp_queue
29             _log_queue
30             _script_cache
31             _lua
32             )],
33             setters => {
34             set_requeue_limit => 'requeue_limit',
35             set_busy_expiry_time => 'busy_expiry_time',
36             set_claim_wait_timeout => 'claim_wait_timeout',
37             }
38 1     1   239 };
  1         2  
39              
40             my %QueueType = map { $_ => undef } (qw(main busy failed time temp log));
41              
42             sub new {
43 0     0 1   my ($class, %params) = @_;
44 0           for (qw(server port queue_name)) {
45             croak("Need '$_' parameter")
46 0 0         if not defined $params{$_};
47             }
48              
49 0           my %AllowedNewParams = map { $_ => undef } (qw(
  0            
50             server port db queue_name busy_expiry_time
51             claim_wait_timeout requeue_limit redis_conn redis_options
52             warn_on_requeue));
53 0           for (keys %params) {
54             croak("Invalid parameter '$_'")
55 0 0         if not exists $AllowedNewParams{$_};
56             }
57              
58 0           my $self = bless({
59             requeue_limit => 5,
60             busy_expiry_time => 30,
61             claim_wait_timeout => 1,
62             db => 0,
63             warn_on_requeue => 0,
64             %params
65             } => $class);
66             $self->{"_$_" . '_queue'} = $params{queue_name} . "_$_"
67 0           for keys %QueueType;
68              
69 0   0       $self->{redis_options} ||= { reconnect => 60 };
70             $self->{redis_conn} ||= Redis->new(
71             # by default, auto-reconnect during 60 seconds
72 0   0       %{$self->{redis_options}},
  0            
73             encoding => undef, # force undef for binary data
74             server => join(":", $self->server, $self->port),
75             );
76              
77 0 0         $self->redis_conn->select($self->db) if $self->db;
78              
79             $self->{_lua}
80 0           = Queue::Q::ReliableFIFO::Lua->new(redis_conn => $self->redis_conn);
81              
82 0           return $self;
83             }
84              
85             sub clone {
86 0     0 1   my ($class, $org, %params) = @_;
87 0           my %default = map { $_ => $org->{$_} }
  0            
88             grep m/^[a-zA-Z]/,
89             keys %$org;
90 0           return $class->new(%default, %params);
91             }
92              
93             sub enqueue_item {
94 0     0 1   my $self = shift;
95 0 0         return if not @_;
96              
97             return $self->redis_conn->lpush(
98             $self->_main_queue,
99 0           map { Queue::Q::ReliableFIFO::Item->new(data => $_)->_serialized } @_
  0            
100             );
101             }
102              
103 1     1   1387 use constant NONBLOCKING => 0;
  1         2  
  1         71  
104 1     1   5 use constant BLOCKING => 1;
  1         3  
  1         4677  
105              
106             sub claim_item {
107 0     0 1   my ($self, $n) = @_;
108 0           return $self->_claim_item_internal($n, BLOCKING);
109             }
110              
111             sub claim_item_nonblocking {
112 0     0 1   my ($self, $n) = @_;
113 0           return $self->_claim_item_internal($n, NONBLOCKING);
114             }
115              
116             sub _claim_item_internal {
117 0     0     my ($self, $n, $doblocking) = @_;
118 0   0       $n ||= 1;
119 0           my $timeout = $self->claim_wait_timeout;
120 0 0         if ($n == 1) {
121             # rpoplpush gives higher throughput than the blocking version
122             # (i.e. brpoplpush). So use the blocked version only when we
123             # need to wait.
124 0           my $value;
125 0           $value = $self->redis_conn->rpoplpush($self->_main_queue, $self->_busy_queue);
126 0 0 0       if (not defined($value) and $doblocking == BLOCKING) {
127 0           $value = $self->redis_conn->brpoplpush($self->_main_queue, $self->_busy_queue, $timeout);
128             }
129 0 0         return if not $value;
130 0           my $item;
131 0           eval { ($item) = Queue::Q::ReliableFIFO::Item->new(_serialized => $value); };
  0            
132             # FIXME ignoring exception in eval{}!
133 0           return $item;
134             }
135             else {
136 0           my $conn = $self->redis_conn;
137 0           my $qn = $self->_main_queue;
138 0           my $bq = $self->_busy_queue;
139 0           my @items;
140             my $serial;
141 0 0         if ($n > 30) {
142             # yes, there is a race, but it's an optimization only
143 0           my ($l) = $self->redis_conn->llen($qn);
144 0 0         $n = $l if $l < $n;
145             }
146             eval {
147             $conn->rpoplpush($qn, $bq, sub {
148 0 0   0     if (defined $_[0]) {
149 0           push @items,
150             Queue::Q::ReliableFIFO::Item->new(_serialized => $_[0])
151             }
152 0           }) for 1..$n;
153 0           $conn->wait_all_responses;
154 0 0 0       if (@items == 0 && $doblocking == BLOCKING) {
155             # list seems empty, use the blocking version
156 0           $serial = $conn->brpoplpush($qn, $bq, $timeout);
157 0 0         if (defined $serial) {
158 0           push(@items,
159             Queue::Q::ReliableFIFO::Item->new(_serialized => $serial));
160 0           undef $serial;
161             $conn->rpoplpush($qn, $bq, sub {
162 0 0   0     if (defined $_[0]) {
163 0           push @items,
164             Queue::Q::ReliableFIFO::Item->new(
165             _serialized => $_[0]);
166             }
167 0           }) for 1 .. ($n-1);
168 0           $conn->wait_all_responses;
169             }
170             }
171 0           1;
172             }
173 0 0         or do {
174 0           return @items; # return with whatever we have...
175             };
176 0           return @items;
177             }
178             }
179              
180             sub mark_item_as_done {
181 0     0 1   my $self = shift;
182 0 0         if (@_ == 1) {
183 0           return $self->redis_conn->lrem(
184             $self->_busy_queue, -1, $_[0]->_serialized);
185             }
186             else {
187             # TODO since lrem is an O(n) operation in size of busy list,
188             # there's a crossover point at which having l items to remove
189             # from said list is better done in a single O(n) loop through
190             # the list (in Lua?) rather than in l*O(n)=O(ln) operations via
191             # _lrem!
192 0           my $conn = $self->redis_conn;
193 0           my $count = 0;
194             $conn->lrem(
195 0     0     $self->_busy_queue, -1, $_->_serialized, sub { $count += $_[0] })
196 0           for @_;
197 0           $conn->wait_all_responses;
198 0           return $count;
199             }
200             }
201              
202             sub unclaim {
203 0     0 1   my $self = shift;
204 0           return $self->__requeue_busy(1, undef, @_);
205             }
206              
207             sub requeue_busy_item {
208 0     0 1   my ($self, $raw) = @_;
209 0           return $self->__requeue_busy(0, undef, $raw);
210             }
211              
212             sub requeue_busy {
213 0     0 1   my $self = shift;
214 0           return $self->__requeue_busy(0, undef, @_);
215             }
216              
217             sub requeue_busy_error {
218 0     0 0   my $self = shift;
219 0           my $error= shift;
220 0           return $self->__requeue_busy(0, $error, @_);
221             }
222              
223             sub __requeue_busy {
224 0     0     my $self = shift;
225 0           my $place = shift; # 0: producer side, 1: consumer side
226 0           my $error = shift; # error message
227 0           my $n = 0;
228             eval {
229             $n += $self->_lua->call(
230             'requeue_busy',
231             3,
232             $self->_busy_queue,
233             $self->_main_queue,
234             $self->_failed_queue,
235             time(),
236             $_->_serialized,
237             $self->requeue_limit,
238             $place,
239             $error || '',
240 0   0       ) for @_;
241 0           1;
242             }
243 0 0         or do {
244 0           cluck("Lua call went wrong! $@");
245             };
246 0           return $n;
247             }
248              
249             sub requeue_failed_item {
250             #
251             # **deprecated ***
252             # This can stress Redis very hard when there are many failed items.
253             # The lrem operation does a scan. If the item is not
254             # at the position where the lrem-search start, the scan goes on.
255             # A sleep is added in case the method is called for multiple items.
256             #
257 0     0 1   my $self = shift;
258 0           my $n = 0;
259             eval {
260 0           for (@_) {
261 0           $n += $self->_lua->call(
262             'requeue_failed_item',
263             2,
264             $self->_failed_queue,
265             $self->_main_queue,
266             time(),
267             $_->_serialized,
268             );
269 0           usleep(1e5);
270             }
271 0           1;
272             }
273 0 0         or do {
274 0           cluck("Lua call went wrong! $@");
275             };
276 0           return $n;
277             }
278              
279             sub requeue_failed_items {
280 0     0 1   my $self = shift;
281 0 0         if (@_ == 1) {
282             # old API
283 0           my $limit = shift;
284 0           my $n = $self->_lua->call(
285             'requeue_failed',
286             2,
287             $self->_failed_queue,
288             $self->_main_queue,
289             time(),
290             $limit
291             );
292 0 0         if (!defined $n) {
293 0           cluck("Lua call went wrong! $@");
294             }
295 0           return $n;
296             }
297 0           my %options = @_;
298             # delay: how long before trying again after a (temporary) fail
299 0   0       my $delay = delete $options{Delay} || 0;
300 0   0       my $max_fc = delete $options{MaxFailCount} || -1;
301 0   0       my $chunk = delete $options{Chunk} || 100;
302 0           cluck("Invalid option: $_") for (keys %options);
303              
304 0           my $total_requeued = 0;
305 0 0         if ($self->queue_length('failed') > 0) {
306 0           my ($todo, $requeued) = (0,0);
307 0           do {
308 0           ($todo, $requeued) = split(/\s+/, $self->_lua->call(
309             'requeue_failed_gentle',
310             3,
311             $self->_failed_queue,
312             $self->_main_queue,
313             $self->_temp_queue,
314             time(),
315             $chunk,
316             $delay,
317             $max_fc,
318             ));
319 0           $total_requeued += $requeued;
320 0           usleep(1e5);
321             }
322             while($todo > 0);
323             }
324 0           return $total_requeued;
325             }
326              
327             sub get_and_flush_failed_items {
328             # depreacted, use remove_failed_items
329 0     0 1   my ($self, %options) = @_;
330 0           my (undef, $failures) = $self->remove_failed_items(%options);
331 0           return @$failures;
332             }
333              
334             sub remove_failed_items {
335 0     0 1   my ($self, %options) = @_;
336 0   0       my $min_age = delete $options{MinAge} || 0;
337 0   0       my $min_fc = delete $options{MinFailCount} || 0;
338 0   0       my $chunk = delete $options{Chunk} || 100;
339 0   0       my $loglimit= delete $options{LogLimit} || 100;
340 0           cluck("Invalid option: $_") for (keys %options);
341              
342 0           my $total_removed= 0;
343 0 0         if ($self->queue_length('failed') > 0) {
344 0           my ($todo, $removed) = (0,0);
345 0           do {
346 0           my $now = time();
347 0           ($todo, $removed) = split(/\s+/, $self->_lua->call(
348             'remove_failed_gentle',
349             3,
350             $self->_failed_queue,
351             $self->_temp_queue,
352             $self->_log_queue,
353             $now,
354             $chunk,
355             ($now - $min_age),
356             $min_fc,
357             $loglimit,
358             ));
359 0           $total_removed += $removed;
360 0           usleep(1e5);
361             }
362             while($todo > 0);
363             }
364 0 0         return (0,[])
365             if $total_removed == 0;
366              
367 0           my $conn = $self->redis_conn;
368             my @serial =
369 0           map { Queue::Q::ReliableFIFO::Item->new(_serialized => $_) }
  0            
370             $conn->lrange($self->_log_queue, 0, -1);
371 0           $conn->del($self->_log_queue);
372 0           return ($total_removed, \@serial);
373             }
374              
375             sub flush_queue {
376 0     0 1   my $self = shift;
377 0           my $conn = $self->redis_conn;
378 0           $conn->multi;
379             $conn->del($_)
380 0           for ($self->_main_queue, $self->_busy_queue,
381             $self->_failed_queue, $self->_time_queue);
382 0           $conn->exec;
383 0           return;
384             }
385              
386             sub queue_length {
387 0     0 1   my ($self, $type) = @_;
388 0           __validate_type(\$type);
389 0           my $qn = $self->queue_name . "_$type";
390 0           my ($len) = $self->redis_conn->llen($qn);
391 0           return $len;
392             }
393              
394             sub peek_item {
395 0     0 1   my ($self, $type) = @_;
396             # this function returns the value of oldest item in the queue
397 0           __validate_type(\$type);
398 0           my $qn = $self->queue_name . "_$type";
399              
400             # take oldest item
401 0           my ($serial) = $self->redis_conn->lrange($qn,-1,-1);
402 0 0         return undef if ! $serial; # empty queue
403              
404 0           my $item = Queue::Q::ReliableFIFO::Item->new(_serialized => $serial);
405 0           return $item->data();
406             }
407              
408             sub age {
409 0     0 1   my ($self, $type) = @_;
410             # this function returns age of oldest item in the queue (in seconds)
411 0           __validate_type(\$type);
412 0           my $qn = $self->queue_name . "_$type";
413              
414             # take oldest item
415 0           my ($serial) = $self->redis_conn->lrange($qn,-1,-1);
416 0 0         return 0 if ! $serial; # empty queue, so age 0
417              
418 0           my $item = Queue::Q::ReliableFIFO::Item->new(_serialized => $serial);
419 0           return time() - $item->time_queued;
420             }
421              
422             sub raw_items_main {
423 0     0 1   my $self = shift;
424 0           return $self->_raw_items('main', @_);
425             }
426              
427             sub raw_items_busy {
428 0     0 1   my $self = shift;
429 0           return $self->_raw_items('busy', @_);
430             }
431              
432             sub raw_items_failed {
433 0     0 1   my $self = shift;
434 0           return $self->_raw_items('failed', @_);
435             }
436              
437             sub _raw_items {
438 0     0     my ($self, $type, $n) = @_;
439             #__validate_type(\$type); # truism, cf. the ten lines above this
440 0   0       $n ||= 0;
441 0           my $qn = $self->queue_name . "_$type";
442             return
443 0           map { Queue::Q::ReliableFIFO::Item->new(_serialized => $_); }
  0            
444             $self->redis_conn->lrange($qn, -$n, -1);
445             }
446              
447             sub __validate_type {
448 0     0     my $type = shift;
449 0   0       $$type ||= 'main';
450             croak("Unknown queue type $$type")
451 0 0         if not exists $QueueType{$$type};
452             }
453              
454             sub memory_usage_perc {
455 0     0 1   my $self = shift;
456 0           my $conn = $self->redis_conn;
457 0           my $info = $conn->info('memory');
458 0           my $mem_used = $info->{used_memory};
459 0           my (undef, $mem_avail) = $conn->config('get', 'maxmemory');
460 0 0         return 100 if $mem_avail == 0; # if nothing is available, it's full!
461 0           return $mem_used * 100 / $mem_avail;
462             }
463              
464              
465             SCOPE: {
466             my %ValidErrorActions = map { $_ => 1 } (qw(drop requeue));
467             my %ValidOptions = map { $_ => 1 } (qw(
468             Chunk DieOnError ReturnOnDie MaxItems MaxSeconds ProcessAll Pause ReturnWhenEmpty NoSigHandlers WarnOnError
469             ));
470              
471             sub consume {
472 0     0 1   my ($self, $callback, $error_action, $options) = @_;
473             # validation of input
474 0   0       $error_action ||= 'requeue';
475             croak("Unknown error action")
476 0 0         if not exists $ValidErrorActions{$error_action};
477             my %error_subs = (
478 0     0     'drop' => sub { my ($self, $item) = @_;
479 0           $self->mark_item_as_done($item); },
480 0     0     'requeue' => sub { my ($self, $item, $error) = @_;
481 0           $self->requeue_busy_error($error, $item); },
482 0           );
483 0   0       my $onerror = $error_subs{$error_action}
484             || croak("no handler for $error_action");
485              
486 0 0         $options = $options ? {%$options} : {};
487 0   0       my $chunk = delete $options->{Chunk} || 1;
488 0 0         croak("Chunk should be a number > 0") if (! $chunk > 0);
489             cluck("DieOnError is deprecated, use ReturnOnDie instead")
490 0 0         if exists $options->{DieOnError};
491 0   0       my $return = delete $options->{ReturnOnDie} || delete $options->{DieOnError} || 0;
492 0   0       my $maxitems = delete $options->{MaxItems} || -1;
493 0   0       my $maxseconds = delete $options->{MaxSeconds} || 0;
494 0   0       my $pause = delete $options->{Pause} || 0;
495 0   0       my $process_all = delete $options->{ProcessAll} || 0;
496 0   0       my $return_when_empty= delete $options->{ReturnWhenEmpty} || 0;
497 0   0       my $nohandlers = delete $options->{NoSigHandlers} || 0;
498 0   0       my $warn_on_error = delete $options->{WarnOnError} || 0;
499 0 0 0       croak("Option ProcessAll without Chunk does not make sense")
500             if $process_all && $chunk <= 1;
501 0 0 0       croak("Option Pause without Chunk does not make sense")
502             if $pause && $chunk <= 1;
503              
504 0           for (keys %$options) {
505 0 0         croak("Unknown option $_") if not exists $ValidOptions{$_};
506             }
507 0 0         my $stop_time = $maxseconds > 0 ? time() + $maxseconds : 0;
508              
509             # Now we can start...
510 0           my $stop = 0;
511 0           my $MAX_RECONNECT = 60;
512 0 0         my $sigint = ref $SIG{INT} eq 'CODE' ? $SIG{INT} : undef;
513 0 0         my $sigterm = ref $SIG{TERM} eq 'CODE' ? $SIG{TERM} : undef;
514             local $SIG{INT} = $nohandlers ? $sigint : sub {
515 0     0     print "stopping\n";
516 0           $stop = 1;
517 0 0         &$sigint if $sigint;
518 0 0         };
519             local $SIG{TERM} = $nohandlers ? $sigterm : sub {
520 0     0     print "stopping\n";
521 0           $stop = 1;
522 0 0         &$sigterm if $sigterm;
523 0 0         };
524              
525 0 0         if ($chunk == 1) {
526 0           my $die_afterwards = 0;
527 0           my $claimed_count = 0;
528 0           my $done_count = 0;
529 0           while(!$stop) {
530 0           my $item = eval { $self->claim_item(); };
  0            
531 0 0         if (!$item) {
532 0 0 0       last if $return_when_empty
      0        
533             || ($stop_time > 0 && time() >= $stop_time);
534 0           next; # nothing claimed this time, try again
535             }
536 0           $claimed_count++;
537 0           my $ok = eval { $callback->($item->data); 1; };
  0            
  0            
538 0 0         if (!$ok) {
539 0           my $error = _clean_error($@);
540 0 0 0       warn "callback had an error: $error"
541             if $warn_on_error and $error;
542 0           for (1 .. $MAX_RECONNECT) { # retry if connection is lost
543 0           eval { $onerror->($self, $item, $error); 1; }
  0            
544 0 0         or do {
545 0 0         last if $stop;
546 0           sleep 1;
547 0           next;
548             };
549 0           last;
550             }
551 0 0         if ($return) {
552 0           $stop = 1;
553 0           cluck("Stopping because of ReturnOnDie\n");
554             }
555             } else {
556 0           for (1 .. $MAX_RECONNECT) { # retry if connection is lost
557             eval {
558 0           $done_count += $self->mark_item_as_done($item);
559 0           1;
560 0 0         } or do {
561 0 0         last if $stop;
562 0           sleep 1;
563 0           next;
564             };
565 0           last;
566             }
567             }
568 0 0 0       $stop = 1 if ($maxitems > 0 && --$maxitems == 0)
      0        
      0        
569             || ($stop_time > 0 && time() >= $stop_time);
570             }
571 0           my $still_busy = $claimed_count - $done_count;
572 0 0 0       warn "not all items removed from busy queue ($still_busy)\n"
573             if $self->warn_on_requeue && $still_busy;
574             }
575             else {
576 0           my $die_afterwards = 0;
577 0           my $t0 = Time::HiRes::time();
578 0           while(!$stop) {
579 0           my @items;
580              
581             # give queue some time to grow
582 0 0         if ($pause) {
583 0           my $pt = ($pause - (Time::HiRes::time()-$t0))*1e6;
584 0 0         Time::HiRes::usleep($pt) if $pt > 0;
585             }
586              
587 0           eval { @items = $self->claim_item($chunk); 1; }
  0            
588 0 0         or do {
589 0           print "error with claim\n";
590             };
591 0 0         $t0 = Time::HiRes::time() if $pause; # only relevant for pause
592 0 0         if (@items == 0) {
593 0 0 0       last if $return_when_empty
      0        
594             || ($stop_time > 0 && time() >= $stop_time);
595 0           next; # nothing claimed this time, try again
596             }
597 0           my @done;
598 0 0         if ($process_all) {
599             # process all items in one call (option ProcessAll)
600 0           my $ok = eval { $callback->(map { $_->data } @items); 1; };
  0            
  0            
  0            
601 0 0         if ($ok) {
602 0           @done = splice @items;
603             }
604             else {
605             # we need to call onerror for all items now
606 0           @done = (); # consider all items failed
607 0           my $error = _clean_error($@);
608 0 0 0       warn "callback had an error: $error"
609             if $warn_on_error and $error;
610 0           while (my $item = shift @items) {
611 0           for (1 .. $MAX_RECONNECT) {
612 0           eval { $onerror->($self, $item, $error); 1; }
  0            
613 0 0         or do {
614 0 0         last if $stop; # items might stay in busy mode
615 0           sleep 1;
616 0           next;
617             };
618 0           last;
619             }
620 0 0         if ($return) {
621 0           cluck("Stopping because of ReturnOnDie\n");
622 0           $stop = 1;
623             }
624 0 0         last if $stop;
625             }
626             }
627             }
628             else {
629             # normal case: process items one by one
630 0           while (my $item = shift @items) {
631 0           my $ok = eval { $callback->($item->data); 1; };
  0            
  0            
632 0 0         if ($ok) {
633 0           push @done, $item;
634             }
635             else {
636 0           my $error = _clean_error($@);
637 0 0 0       warn "callback had an error: $error"
638             if $warn_on_error and $error;
639             # retry if connection is lost
640 0           for (1 .. $MAX_RECONNECT) {
641 0           eval { $onerror->($self, $item, $error); 1; }
  0            
642 0 0         or do {
643 0 0         last if $stop;
644 0           sleep 1;
645 0           next;
646             };
647 0           last;
648             }
649 0 0         if ($return) {
650 0           cluck("Stopping because of ReturnOnDie\n");
651 0           $stop = 1;
652             }
653             }
654 0 0         last if $stop;
655             }
656             }
657 0           my $count = 0;
658 0           for (1 .. $MAX_RECONNECT) {
659 0           eval { $count += $self->mark_item_as_done(@done); 1; }
  0            
660 0 0         or do {
661 0 0         last if $stop;
662 0           sleep 1;
663 0           next;
664             };
665 0           last;
666             }
667 0 0 0       warn "not all items removed from busy queue ($count)\n"
668             if $self->warn_on_requeue && $count != @done;
669              
670             # put back the claimed but not touched items
671 0 0         if (@items > 0) {
672 0           my $n = @items;
673 0           print "unclaiming $n items\n";
674 0           for (1 .. $MAX_RECONNECT) {
675 0           eval { $self->unclaim($_) for @items; 1; }
  0            
676 0 0         or do {
677 0 0         last if $stop;
678 0           sleep 1;
679 0           next;
680             };
681 0           last;
682             }
683             }
684 0 0 0       $stop = 1 if ($maxitems > 0 && ($maxitems -= @done) <= 0)
      0        
      0        
685             || ($stop_time > 0 && time() >= $stop_time);
686             }
687             }
688             } # end 'sub consume'
689             } # end SCOPE
690              
691             sub _clean_error {
692 0     0     $_[0] =~ s/, line [0-9]+//;
693 0           chomp $_[0];
694 0           return $_[0];
695             }
696              
697             # methods to be used for cleanup script and Nagios checks
698             # the methods read or remove items from the busy queue
699             sub handle_expired_items {
700 0     0 1   my ($self, $timeout, $action) = @_;
701 0   0       $timeout ||= 10;
702 0 0         die "timeout should be a number> 0" if not int($timeout);
703 0 0 0       die "unknown action"
704             if not $action or $action !~ /^(?:requeue|drop)$/;
705 0           my $conn = $self->redis_conn;
706 0           my @serial = $conn->lrange($self->_busy_queue, 0, -1);
707 0           my $time = time;
708             my %timetable =
709 0           map { reverse split /-/,$_,2 }
  0            
710             $conn->lrange($self->_time_queue, 0, -1);
711 0           my @match = grep { exists $timetable{$_} } @serial;
  0            
712 0           my %match = map { $_ => undef } @match;
  0            
713 0           my @timedout = grep { $time - $timetable{$_} >= $timeout } @match;
  0            
714 0           my @log;
715              
716 0 0         if ($action eq 'requeue') {
    0          
717 0           for my $serial (@timedout) {
718 0           my $item = Queue::Q::ReliableFIFO::Item->new(
719             _serialized => $serial
720             );
721 0           my $n = $self->requeue_busy_item($item);
722 0 0         push @log, $item
723             if $n;
724             }
725             }
726             elsif ($action eq 'drop') {
727 0           for my $serial (@timedout) {
728 0           my $n = $conn->lrem( $self->_busy_queue, -1, $serial);
729 0 0         push @log, Queue::Q::ReliableFIFO::Item->new(_serialized => $serial)
730             if $n;
731             }
732             }
733              
734             # We create a new timetable. We take the original timetable and
735             # exclude:
736             # 1. the busy items which timed out and we just handled
737             # 2. timetable items which have no corresponding busy items anymore
738 0           my %timedout = map { $_ => undef } @timedout;
  0            
739 0           my %busy = map { $_ => undef } @serial;
  0            
740             my %newtimetable =
741 0           map { $_ => $timetable{$_} }
742 0           grep { exists $busy{$_} } # exclude (ad 2.)
743 0           grep { ! exists $timedout{$_} } # exclude (ad 1.)
  0            
744             keys %timetable; # original timetable
745             # put in the items of latest scan we did not see before
746             $newtimetable{$_} = $time
747 0           for (grep { !exists $newtimetable{$_} } @serial);
  0            
748 0           $conn->multi;
749 0           $conn->del($self->_time_queue);
750             $conn->lpush($self->_time_queue, join('-',$newtimetable{$_},$_))
751 0           for (keys %newtimetable);
752 0           $conn->exec;
753             #FIXME the log info should also show what is done with the items
754             # (e.g. dropped after requeue limit).
755 0           return @log;
756             }
757             1;
758              
759             __END__