File Coverage

blib/lib/Redis/Handle.pm
Criterion Covered Total %
statement 15 61 24.5
branch 0 18 0.0
condition 0 6 0.0
subroutine 5 15 33.3
pod 1 1 100.0
total 21 101 20.7


line stmt bran cond sub pod time code
1             package Redis::Handle;
2 1     1   24491 use strict;
  1         2  
  1         37  
3 1     1   6 use warnings;
  1         1  
  1         27  
4 1     1   5 use Carp;
  1         2  
  1         91  
5 1     1   1166 use Redis;
  1         459537  
  1         29  
6 1     1   884 use AnyEvent::Redis;
  1         52292  
  1         684  
7              
8             our $VERSION = '0.2.0'; # VERSION
9             # ABSTRACT: Tie::Handle interface for Redis queues
10              
11             # use Data::Dump qw(pp);
12              
13             =head1 NAME
14              
15             Redis::Handle - A filehandle tie for a Redis queue
16              
17             =head1 SYNOPSIS
18              
19             tie *REDIS, 'Redis::Handle';
20             print REDIS "Foo bar baz\n";
21             print while ; # Prints "Foo bar baz\n"
22            
23             print REDIS "Foo", "Bar";
24             my @baz = ; # @baz is now ("Foo","Bar")
25            
26             print REDIS "Foo", "Bar";
27             print ; # Prints "Foo"
28              
29             =head1 DESCRIPTION
30              
31             C implements a tie interface to a Redis queue so that you can
32             treat said queue as a filehandle. Pushing to the front of the queue is the same
33             as a C, and popping off the end of the queue is like a C.
34              
35             =cut
36              
37             {
38             my $timeout = 30; # For BLPOPs, in seconds
39             my $redis; # We want only a single Redis connection
40             my %redis; # Connection information
41              
42             =head1 METHODS
43              
44             =head2 TIEHANDLE
45              
46             Ties the filehandle to the clientId in Redis.
47              
48             =head3 Usage
49              
50             tie *CLIENT, "Redis::Handle", $clientId;
51            
52             tie *CLIENT, 'Redis::Handle', $clientId,
53             timeout => 100,
54             host => 'example.com',
55             port => 5800;
56            
57             # pass an existing Redis connection
58             tie *CLIENT, 'Redis::Handle', $clientId, $redis;
59              
60             =cut
61              
62             sub TIEHANDLE {
63 0     0     my ($class,$clientId) = (+shift,+shift);
64 0 0         $redis = shift if ref $_[0];
65 0           %redis = @_;
66 0   0       $redis ||= Redis->new(%redis);
67              
68 0 0         if ($redis{timeout}) {
69 0           $timeout = $redis{timeout};
70 0           delete $redis{timeout};
71             }
72              
73 0           bless \$clientId, $class;
74             }
75              
76             =head2 PRINT
77              
78             Sends the message(s) to the client. Since we're using an AnyEvent connection,
79             events are still processed while waiting on Redis to process the push,
80             including asynchronously pushing _other_ messages.
81              
82             =head3 Usage
83              
84             print CLIENT nfreeze({ text => "foo", from => "bar" });
85             print CLIENT nfreeze({ text => "foo" }), nfreeze({ text => "bar" }), "System message";
86              
87             =cut
88             sub PRINT {
89 0     0     my $this = shift;
90 0 0         $redis->ping or $redis = Redis->new(%redis);
91 0           foreach (@_) {
92 0 0         $redis->lpush($$this, $_) or
93             croak qq{Failed to push message [$_] to [$$this]: $!};
94             }
95 0           return 1;
96             }
97              
98             =head2 READLINE
99              
100             Reads the next message or flushes the message queue (depending on context).
101             This is a "blocking" operation, but, because we're using AnyEvent::Redis, other
102             events are still processed while we wait. Since Redis's C operation
103             blocks the whole connection, this spawns a separate AnyEvent::Redis connection
104             to deal with the blocking operation.
105              
106             =head3 Usage
107              
108             my $message = ; # Reads only the next one
109             my @messages = ; # Flushes the message queue into @messages
110              
111             =cut
112             sub READLINE {
113 0     0     my $this = shift;
114 0 0         $redis->ping or $redis = Redis->new(%redis);
115 0 0         my $r = AnyEvent::Redis->new(%redis) or
116 0           croak qq(Couldn't create AnyEvent::Redis connection to [@{[%redis]}]: $!);
117 0           my $message;
118 0           until ($message) {
119             my $cv = $r->brpop($$this, $timeout, sub {
120 0     0     $message = $_[0][1];
121 0 0         }) or croak qq{Couldn't BRPOP from [$$this]: $!};
122 0           $cv->recv;
123             }
124 0           $r->quit; undef $r;
  0            
125 0 0         return $message unless wantarray;
126 0           return ($message, _flush($this));
127             }
128              
129             =for READLINE
130              
131             Helper methods for READLINE
132              
133             If you pass C<_flush> a nonzero number, it will read that many messages. An
134             explicit "0" means "read nothing", while an C means "read everything".
135              
136             =cut
137             sub _flush {
138 0     0     my ($this,$count) = @_;
139 0           my @messages;
140 0           while (my $m = $redis->rpop($$this)) {
141 0 0 0       last if defined $count && --$count < 0;
142 0           push @messages, $m;
143             }
144 0           return @messages;
145             }
146              
147             =head2 EOF
148              
149             Just like the regular C call. Returns 1 if the next read will be the end
150             of the queue or if the queue isn't open.
151              
152             =cut
153             sub EOF {
154 0     0     my $this = shift;
155 0           return not _len($this);
156             }
157              
158             =for EOF,READLINE
159              
160             Returns the length of the buffer.
161              
162             =cut
163             sub _len {
164 0     0     my $this = shift;
165 0           return $redis->llen($$this);
166             }
167              
168             =head2 poll_once
169              
170             Returns the C of the a blocking pop operation on a Redis queue.
171             This is useful if, for example, you want to handle a C as an asynchronous
172             PSGI handler, since a standard C operation throws a "recursive blocking
173             wait" exception (because you're waiting on a C that's waiting on a
174             C). It takes a C variable, an optional C of the maximum
175             number of messages to return, and a callback as its arguments.
176              
177             =head3 Usage
178              
179             sub get {
180             my ($self,$clientId) = (+shift,+shift);
181             my $output = tie local *CLIENT, 'Redis::MessageQueue', "$clientId:out";
182             $output->poll_once(sub {
183             $self->write(+shift);
184             $self->finish;
185             });
186             }
187              
188             =cut
189             sub poll_once {
190 0     0 1   my $fn = pop;
191 0           my ($this,$count) = @_;
192 0           my $r = AnyEvent::Redis->new(%redis);
193             $r->brpop($$this, $timeout, sub {
194 0     0     my $message = $_[0][1];
195 0           $r->quit; undef $r;
  0            
196 0           return $fn->($message, _flush($this,$count));
197 0           });
198             }
199              
200             =head2 CLOSE
201              
202             Cleanup code so that we don't end up with a bunch of open filehandles.
203              
204             =cut
205             sub CLOSE {
206             # The elements of @_ are *aliases*, not copies, so undefing $_[0] marks
207             # the caller's typeglob as empty.
208 0     0     $redis->ping;
209 0           undef $_[0];
210             }
211              
212             }
213             1;