File Coverage

blib/lib/Danga/Socket/Redis.pm
Criterion Covered Total %
statement 31 142 21.8
branch 4 64 6.2
condition 2 18 11.1
subroutine 7 15 46.6
pod 0 5 0.0
total 44 244 18.0


line stmt bran cond sub pod time code
1             package Danga::Socket::Redis;
2 1     1   26327 use strict;
  1         2  
  1         40  
3 1     1   947 use IO::Socket;
  1         44959  
  1         5  
4 1     1   1451 use Danga::Socket::Callback;
  1         27736  
  1         36  
5              
6             =head1 NAME
7              
8             Danga::Socket::Redis - An asynchronous redis client.
9              
10             =head1 SYNOPSIS
11              
12             use Danga::Socket::Redis;
13              
14             my $rs = Danga::Socket::Redis->new ( connected => \&redis_connected );
15            
16             sub redis_connected {
17             $rs->set ( "key", "value" );
18             $rs->get ( "key", sub { my ( $self, $value ) = @_; print "$key = $value\n" } );
19             $rs->publish ( "newsfeed", "Twitter is down" );
20             $rs->hset ( "hkey", "field", "value" );
21             $rs->hget ( "hkey", "field", sub { my ( $self, $value ) = @_ } );
22             $rs->subscribe ( "newsfeed", sub { my ( $self, $msg ) = @_ } );
23             }
24            
25             Danga::Socket->EventLoop;
26              
27              
28             =head1 DESCRIPTION
29              
30             An asynchronous client for the key/value store redis. Asynchronous
31             basically means a method does not block. A supplied callback will be
32             called with the results when they are ready.
33              
34             =head1 USAGE
35              
36              
37              
38             =head1 BUGS
39              
40             Only started, a lot of redis functions need to be added.
41              
42              
43             =head1 SUPPORT
44              
45             dm @martinredmond
46             martin @ tinychat.com
47              
48             =head1 AUTHOR
49              
50             Martin Redmond
51             CPAN ID: REDS
52             Tinychat.com
53             @martinredmond
54             http://Tinychat.com/about.php
55              
56             =head1 COPYRIGHT
57              
58             This program is free software; you can redistribute
59             it and/or modify it under the same terms as Perl itself.
60              
61             The full text of the license can be found in the
62             LICENSE file included with this module.
63              
64              
65             =head1 SEE ALSO
66              
67             perl(1).
68              
69             =cut
70              
71              
72             BEGIN {
73 1     1   7 use Exporter ();
  1         1  
  1         16  
74 1     1   4 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
  1         2  
  1         91  
75 1     1   2 $VERSION = '0.06';
76 1         16 @ISA = qw(Exporter);
77 1         2 @EXPORT = qw();
78 1         3 @EXPORT_OK = qw(set get
79             hset hget
80             publish subscribe);
81 1         1759 %EXPORT_TAGS = ();
82             }
83              
84             our $AUTOLOAD;
85              
86             our %cmds = (
87             exists => { args => 1 },
88             del => { args => 1 },
89             type => { args => 1 },
90             keys => { args => 1 },
91             randomkey => { args => 0 },
92             rename => { args => 2 },
93             renamenx => { args => 2 },
94             dbsize => { args => 0 },
95             expire => { args => 2 },
96             ttl => { args => 2 },
97             select => { args => 1 },
98             move => { args => 2 },
99             flushdb => { args => 0 },
100             flushall => { args => 0 },
101              
102             set => { args => 2 },
103             get => { args => 1 },
104             getset => { args => 2 },
105             mget => { margs => 1 },
106             setnx => { args => 2 },
107             setex => { args => 3 },
108             mset => { margs => 1 },
109             msetnx => { margs => 1 },
110             incr => { args => 1 },
111             incrby => { args => 1 },
112             decr => { args => 1 },
113             decrby => { args => 1 },
114             append => { args => 2 },
115             substr => { args => 3 },
116              
117             rpush => { args => 2 },
118             lpush => { args => 2 },
119             llen => { args => 1 },
120             lrange => { args => 2 },
121             ltrim => { args => 3 },
122             lindex => { args => 2 },
123             lset => { args => 3 },
124             lrem => { args => 3 },
125             lpop => { args => 1 },
126             rpop => { args => 1 },
127             blpop => { margs => 1 },
128             brpop => { margs => 1 },
129             rpoplpush => { args => 2 },
130              
131             sadd => { args => 2 },
132             srem => { args => 2 },
133             spop => { args => 1 },
134             smove => { args => 3 },
135             scard => { args => 1 },
136             sismember => { args => 2 },
137             sinter => { margs => 1 },
138             sinterstore => { margs => 1 },
139             sunion => { margs => 1 },
140             sunionstore => { margs => 1 },
141             sdiff => { margs => 1 },
142              
143             smembers => { args => 1 },
144             srandmember => { args => 1 },
145             sdiffstore => { margs => 1 },
146            
147             zadd => { args => 3 },
148             zrem => { args => 2 },
149             zincrby => { args => 3 },
150             zrank => { args => 2 },
151             zrevrank => { args => 2 },
152             zrange => { args => 3 },
153             zrevrange => { args => 3 },
154             zrangebyscore => { args => 3 },
155             zcount => { args => 4 },
156             zcard => { args => 1 },
157             zscore => { args => 0 },
158             zremrangebyrank => { args => 0 },
159             zremrangebyscore => { args => 0 },
160             zunionstore => { args => 0 },
161              
162             hset => { args => 3 },
163             hget => { args => 2 },
164             hmget => { margs => 1 },
165             hmset => { margs => 1 },
166             hincrby => { args => 0 },
167             hexists => { args => 2 },
168             hdel => { args => 2 },
169             hlen => { args => 1 },
170             hkeys => { args => 1 },
171             hvals => { args => 1 },
172             hgetall => { args => 1 },
173              
174             subscribe => { args => 1 },
175             unsubscribe => { args => 1 },
176             publish => { args => 2 },
177              
178             # * MULTI/EXEC/DISCARD/WATCH/UNWATCH Redis atomic transactions
179             sort => { args => 0 },
180             save => { args => 0 },
181             bgsave => { args => 0 },
182             lastsave => { args => 0 },
183             shutdown => { args => 0 },
184             bgrewriteaof => { args => 0 },
185              
186             info => { args => 0 },
187             monitor => { args => 0 },
188             slaveof => { args => 0 },
189             config => { args => 0 },
190             );
191              
192             1;
193              
194             sub new {
195 1     1 0 16 my ($class, %args) = @_;
196 1   33     9 my $self = bless ({}, ref ($class) || $class);
197 1         3 my $peeraddr = "localhost:6379";
198 1 50       5 $peeraddr = "$args{host}:6379" if $args{host};
199 1 50       3 $peeraddr = "localhost:$args{port}" if $args{port};
200 1 50 33     5 $peeraddr = "$args{host}:$args{port}" if $args{host} && $args{port};
201 1         15 my $sock = IO::Socket::INET->new (
202             PeerAddr => $peeraddr,
203             Blocking => 0,
204             );
205 1 50       1467 $self->{connected_cb} = $args{connected} if $args{connected};
206 1         2 my $a = '';
207             $self->{rs} = Danga::Socket::Callback->new
208             (
209             handle => $sock,
210             context => { buf => \$a, rs => $self },
211             on_read_ready => sub {
212 0     0   0 my $self = shift;
213 0         0 my $bref = $self->read ( 1024 * 8 );
214 0         0 my $buf = $self->{context}->{buf};
215 0 0       0 if ( $bref ) {
216 0 0       0 $buf = length ( $$buf ) > 0 ?
217             \ ($$buf . $$bref) :
218             $bref;
219 0         0 $self->{context}->{buf} = $self->{context}->{rs}->do_buf ( $buf );
220             } else {
221 0         0 $self->close ( 'read' );
222 0         0 die "reading from redis";
223             }
224             },
225             on_write_ready => sub {
226 0     0   0 my $self = shift;
227 0         0 $self->watch_write ( 0 );
228 0         0 my $cb = delete $self->{context}->{rs}->{connected_cb};
229 0 0       0 &$cb ( $self->{context}->{rs} ) if $cb;
230             }
231 1         19 );
232 1         4147 return bless $self;
233             }
234              
235             sub do_buf {
236 0     0 0   my ( $self, $buf ) = @_;
237 0           my $o;
238 0           while ( 1 ) {
239 0           ( $buf, $o ) =
240             $self->redis_read ( $buf );
241 0 0         last unless $o;
242 0           $self->redis_process ( $o );
243             }
244 0           return $buf; # there may be some stuff left over from this read
245             }
246              
247             sub redis_read {
248 0     0 0   my ( $self, $bref ) = @_;
249 0 0         return ( $bref, undef ) if length ( $$bref ) == 0;
250 0           my $nlpos = index ( $$bref, "\n" );
251 0 0         return ( $bref, undef ) if $nlpos == -1;
252 0           my $tok = substr ( $$bref, 0, 1 );
253 0 0         if ( $tok eq ':' ) {
    0          
    0          
    0          
    0          
254 0           my $n = substr ( $$bref, 1, $nlpos - 2 );
255 0           my $r = substr ( $$bref, $nlpos + 1 );
256 0           return ( \$r, { type => 'int', value => $n } );
257             } elsif ( $tok eq '-' ) {
258 0           my $e = substr ( $$bref, 1, $nlpos - 2 );
259 0           my $r = substr ( $$bref, $nlpos + 1 );
260 0           return ( \$r, { type => 'error', value => $e } );
261             } elsif ( $tok eq '+' ) {
262 0           my $l = substr ( $$bref, 1, $nlpos - 2 );
263 0           my $r = substr ( $$bref, $nlpos + 1 );
264 0           return ( \$r, { type => 'line', value => $l } );
265             } elsif ( $tok eq '$' ) {
266 0           my $l = substr ( $$bref, 1, $nlpos - 2 );
267 0 0         if ( $l == -1 ) {
268 0           my $r = substr ( $$bref, $nlpos + 1 );
269 0           return ( \$r, { type => 'bulkerror' } );
270             }
271             # warn "better check this" if length ( $$bref ) < $nlpos + 1 + $l + 2;
272 0 0         return ( $bref, undef ) if length ( $$bref ) < $nlpos + 1 + $l + 2; # need more data
273 0           my $v = substr ( $$bref, $nlpos + 1, $l );
274 0           my $r = substr ( $$bref, $nlpos + $l + 1 + 2 );
275 0           return ( \$r, { type => 'bulk', value => $v } );
276             } elsif ( $tok eq '*' ) {
277 0           my $l = substr ( $$bref, 1, $nlpos - 2 );
278 0 0         if ( $l == -1 ) {
279 0           my $r = substr ( $$bref, $nlpos + 1 );
280 0           return ( \$r, { type => 'multibulkerror' } );
281             }
282 0           my $obref = $bref;
283 0           my $r = substr ( $$bref, $nlpos + 1 );
284 0           $bref = \$r;
285 0           my @res;
286 0           while ( $l-- ) {
287 0           my $o;
288 0           ( $bref, $o ) = $self->redis_read ( $bref );
289 0 0         return $obref unless $o; # read more?
290 0           push @res, $o;
291             }
292 0           return ( $bref, { type => 'bulkmulti', values => \@res } );
293             } else {
294 0           die "Danga::Socket::Redis bref", $$bref;
295             }
296             }
297              
298             sub redis_process {
299 0     0 0   my ( $self, $o ) = @_;
300 0           my $v = $o->{values};
301 0 0 0       if ( $v && $v->[0]->{value} eq 'message' ) {
302 0 0         if ( my $cb = $self->{subscribe}->{callback}->{$v->[1]->{value}} ) {
303 0           &$cb ( $self, $v->[2]->{value}, $o );
304             }
305 0           return;
306             }
307 0           my $cmd = shift @{$self->{cmdqueue}};
  0            
308 0 0         if ( my $cb = $cmd->{callback} ) {
309 0 0         if ( $o->{type} eq 'bulkerror' ) {
310 0           &$cb ( $self, $o );
311             } else {
312 0 0         if ( $o->{type} eq 'bulkmulti' ) {
313 0           my @vs = map { $_->{value} } @{$o->{values}};
  0            
  0            
314 0           &$cb ( $self, \@vs, $o );
315             } else {
316 0           &$cb ( $self, $o->{value}, $o );
317             }
318             }
319             }
320             }
321              
322 0     0     sub DESTROY {}
323              
324             sub AUTOLOAD {
325 0     0     my $self = shift;
326 0           my $cc = $AUTOLOAD;
327 0           $cc =~ s/.*:://;
328 0           $cc = lc $cc;
329              
330 0           my $opts = $Danga::Socket::Redis::cmds{$cc};
331 0 0         return undef unless $opts;
332              
333 0           my $cmd = { type => $cc };
334 0 0         if ( $opts->{args} > 0 ) {
    0          
335 0           push @{$cmd->{args}}, shift for 1 .. $opts->{args};
  0            
336 0           $cmd->{callback} = shift;
337 0           $cmd->{options} = shift;
338             } elsif ( $opts->{margs} == 1 ) {
339 0           my $last = pop @_;
340 0 0         if ( ref $last eq 'HASH' ) {
341 0           $cmd->{options} = $last;
342 0           $last = pop @_;
343             }
344 0 0         if ( ref $last eq 'CODE' ) {
345 0           $cmd->{callback} = $last;
346             } else {
347 0           push @_, $last;
348             }
349 0           @{$cmd->{args}} = @_;
  0            
350             }
351 0 0 0       if ( $cc eq 'subscribe' && $cmd->{callback} && $cmd->{args} &&
  0   0        
      0        
352             scalar @{$cmd->{args}} == 1 ) {
353 0           $self->{subscribe}->{callback}->{$cmd->{args}->[0]} = $cmd->{callback};
354             }
355 0           $self->redis_send ( $cmd );
356             }
357              
358             sub redis_send {
359 0     0 0   my ( $self, $cmd ) = @_;
360 0 0         $cmd->{args} = [] unless ref $cmd->{args} eq 'ARRAY';
361 0 0         unless ( $cmd->{type} eq 'subscribe' ) {
362 0           push @{$self->{cmdqueue}}, $cmd;
  0            
363             }
364 0           my $send = "*" . ( scalar ( @{$cmd->{args}} ) + 1 ) . "\r\n" .
  0            
365             "\$" . length ( $cmd->{type} ) . "\r\n" .
366             $cmd->{type} . "\r\n";
367 0           foreach ( @{$cmd->{args}} ) {
  0            
368 0           $send .= "\$" . length ($_) . "\r\n$_\r\n";
369             }
370 0           $self->{rs}->write ( $send );
371             }