File Coverage

blib/lib/Queue/Q/NaiveFIFO/Redis.pm
Criterion Covered Total %
statement 27 72 37.5
branch 0 16 0.0
condition 0 10 0.0
subroutine 9 19 47.3
pod 7 7 100.0
total 43 124 34.6


line stmt bran cond sub pod time code
1             package Queue::Q::NaiveFIFO::Redis;
2 3     3   177219 use strict;
  3         8  
  3         85  
3 3     3   16 use warnings;
  3         7  
  3         85  
4 3     3   16 use Carp qw(croak);
  3         5  
  3         181  
5              
6 3     3   1043 use Queue::Q::NaiveFIFO;
  3         8  
  3         88  
7 3     3   2133 use parent 'Queue::Q::NaiveFIFO';
  3         932  
  3         21  
8              
9 3     3   1318 use Redis;
  3         83001  
  3         1176  
10 3     3   57 use Sereal::Encoder;
  3         7  
  3         281  
11 3     3   20 use Sereal::Decoder;
  3         8  
  3         323  
12              
13             our $SerealEncoder;
14             our $SerealDecoder;
15              
16             use Class::XSAccessor {
17 3         39 getters => [qw(server port queue_name db _redis_conn)],
18 3     3   2159 };
  3         7272  
19              
20             sub new {
21 0     0 1   my ($class, %params) = @_;
22 0           for (qw(server port queue_name)) {
23             croak("Need '$_' parameter")
24 0 0         if not defined $params{$_};
25             }
26              
27             my $self = bless({
28 0           (map {$_ => $params{$_}} qw(server port queue_name) ),
29 0   0       db => $params{db} || 0,
30             _redis_conn => undef,
31             } => $class);
32              
33             $self->{_redis_conn} = Redis->new(
34 0 0         %{$params{redis_options} || {}},
  0            
35             encoding => undef, # force undef for binary data
36             server => join(":", $self->server, $self->port),
37             );
38              
39 0 0         $self->_redis_conn->select($self->db) if $self->db;
40              
41 0           return $self;
42             }
43              
44             sub enqueue_item {
45 0     0 1   my $self = shift;
46 0 0         croak("Need exactly one item to enqeue")
47             if not @_ == 1;
48 0           my ($blob) = $self->_serialize($_[0]);
49 0           $self->_redis_conn->lpush($self->queue_name, $blob);
50             }
51              
52             sub enqueue_items {
53 0     0 1   my $self = shift;
54 0 0         return if not @_;
55 0           my $qn = $self->queue_name;
56 0           my $conn = $self->_redis_conn;
57 0           my @blobs = $self->_serialize(@_);
58 0           $conn->lpush($qn, @blobs);
59             }
60              
61             sub claim_item {
62 0     0 1   my ($self) = @_;
63 0           my ($rv) = $self->_deserialize( $self->_redis_conn->rpop($self->queue_name) );
64 0           return $rv;
65             }
66              
67             sub claim_items {
68 0     0 1   my ($self, $n) = @_;
69 0   0       $n ||= 1;
70 0           my $conn = $self->_redis_conn;
71 0           my $qn = $self->queue_name;
72 0 0         if ($n > 100) {
73 0           my ($l) = $self->_redis_conn->llen($qn);
74 0 0         $n = $l if $l < $n;
75             }
76 0           my @elem;
77 0     0     $conn->rpop($qn, sub {push @elem, $_[0]}) for 1..$n;
  0            
78 0           $conn->wait_all_responses;
79 0           return $self->_deserialize( grep defined, @elem );
80             }
81              
82             sub flush_queue {
83 0     0 1   my $self = shift;
84 0           $self->_redis_conn->del($self->queue_name);
85             }
86              
87             sub queue_length {
88 0     0 1   my $self = shift;
89 0           my ($len) = $self->_redis_conn->llen($self->queue_name);
90 0           return $len;
91             }
92              
93             sub _serialize {
94 0     0     my $self = shift;
95 0   0       $SerealEncoder ||= Sereal::Encoder->new({stringify_undef => 1, warn_undef => 1});
96 0           return map $SerealEncoder->encode($_), @_;
97             }
98              
99             sub _deserialize {
100 0     0     my $self = shift;
101 0   0       $SerealDecoder ||= Sereal::Decoder->new();
102 0 0         return map defined($_) ? $SerealDecoder->decode($_) : $_, @_;
103             }
104              
105             1;
106              
107             __END__