File Coverage

blib/lib/Queue/Q/NaiveFIFO/Redis.pm
Criterion Covered Total %
statement 27 72 37.5
branch 0 16 0.0
condition 0 10 0.0
subroutine 9 19 47.3
pod 7 7 100.0
total 43 124 34.6


line stmt bran cond sub pod time code
1             package Queue::Q::NaiveFIFO::Redis;
2 3     3   175983 use strict;
  3         6  
  3         79  
3 3     3   17 use warnings;
  3         3  
  3         80  
4 3     3   15 use Carp qw(croak);
  3         5  
  3         145  
5              
6 3     3   1044 use Queue::Q::NaiveFIFO;
  3         8  
  3         73  
7 3     3   2311 use parent 'Queue::Q::NaiveFIFO';
  3         868  
  3         19  
8              
9 3     3   1308 use Redis;
  3         134235  
  3         87  
10 3     3   74 use Sereal::Encoder;
  3         7  
  3         125  
11 3     3   17 use Sereal::Decoder;
  3         6  
  3         210  
12              
13             our $SerealEncoder;
14             our $SerealDecoder;
15              
16             use Class::XSAccessor {
17 3         67 getters => [qw(server port queue_name db _redis_conn)],
18 3     3   1377 };
  3         5440  
19              
20             sub new {
21 0     0 1   my ($class, %params) = @_;
22 0           for (qw(server port queue_name)) {
23             croak("Need '$_' parameter")
24 0 0         if not defined $params{$_};
25             }
26              
27             my $self = bless({
28 0           (map {$_ => $params{$_}} qw(server port queue_name) ),
29 0   0       db => $params{db} || 0,
30             _redis_conn => undef,
31             } => $class);
32              
33             $self->{_redis_conn} = Redis->new(
34 0 0         %{$params{redis_options} || {}},
  0            
35             encoding => undef, # force undef for binary data
36             server => join(":", $self->server, $self->port),
37             );
38              
39 0 0         $self->_redis_conn->select($self->db) if $self->db;
40              
41 0           return $self;
42             }
43              
44             sub enqueue_item {
45 0     0 1   my $self = shift;
46 0 0         croak("Need exactly one item to enqeue")
47             if not @_ == 1;
48 0           my ($blob) = $self->_serialize($_[0]);
49 0           $self->_redis_conn->lpush($self->queue_name, $blob);
50             }
51              
52             sub enqueue_items {
53 0     0 1   my $self = shift;
54 0 0         return if not @_;
55 0           my $qn = $self->queue_name;
56 0           my $conn = $self->_redis_conn;
57 0           my @blobs = $self->_serialize(@_);
58 0           $conn->lpush($qn, @blobs);
59             }
60              
61             sub claim_item {
62 0     0 1   my ($self) = @_;
63 0           my ($rv) = $self->_deserialize( $self->_redis_conn->rpop($self->queue_name) );
64 0           return $rv;
65             }
66              
67             sub claim_items {
68 0     0 1   my ($self, $n) = @_;
69 0   0       $n ||= 1;
70 0           my $conn = $self->_redis_conn;
71 0           my $qn = $self->queue_name;
72 0 0         if ($n > 100) {
73 0           my ($l) = $self->_redis_conn->llen($qn);
74 0 0         $n = $l if $l < $n;
75             }
76 0           my @elem;
77 0     0     $conn->rpop($qn, sub {push @elem, $_[0]}) for 1..$n;
  0            
78 0           $conn->wait_all_responses;
79 0           return $self->_deserialize( grep defined, @elem );
80             }
81              
82             sub flush_queue {
83 0     0 1   my $self = shift;
84 0           $self->_redis_conn->del($self->queue_name);
85             }
86              
87             sub queue_length {
88 0     0 1   my $self = shift;
89 0           my ($len) = $self->_redis_conn->llen($self->queue_name);
90 0           return $len;
91             }
92              
93             sub _serialize {
94 0     0     my $self = shift;
95 0   0       $SerealEncoder ||= Sereal::Encoder->new({stringify_undef => 1, warn_undef => 1});
96 0           return map $SerealEncoder->encode($_), @_;
97             }
98              
99             sub _deserialize {
100 0     0     my $self = shift;
101 0   0       $SerealDecoder ||= Sereal::Decoder->new();
102 0 0         return map defined($_) ? $SerealDecoder->decode($_) : $_, @_;
103             }
104              
105             1;
106              
107             __END__