File Coverage

blib/lib/MojoX/Redis.pm
Criterion Covered Total %
statement 107 130 82.3
branch 17 32 53.1
condition 11 20 55.0
subroutine 20 22 90.9
pod 3 5 60.0
total 158 209 75.6


line stmt bran cond sub pod time code
1             package MojoX::Redis;
2              
3 2     2   188220 use strict;
  2         2  
  2         57  
4 2     2   6 use warnings;
  2         2  
  2         71  
5              
6             our $VERSION = 0.87;
7 2     2   7 use base 'Mojo::Base';
  2         6  
  2         153  
8              
9 2     2   8 use Mojo::IOLoop;
  2         2  
  2         16  
10 2     2   33 use List::Util ();
  2         2  
  2         17  
11 2     2   6 use Scalar::Util ();
  2         2  
  2         20  
12 2     2   8 use Encode ();
  2         2  
  2         2839  
13             require Carp;
14              
15             __PACKAGE__->attr(server => '127.0.0.1:6379');
16             __PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->singleton });
17             __PACKAGE__->attr(error => undef);
18             __PACKAGE__->attr(timeout => 300);
19             __PACKAGE__->attr(encoding => 'UTF-8');
20             __PACKAGE__->attr(
21             on_error => sub {
22             sub {
23             my $redis = shift;
24             warn "Redis error: ", $redis->error, "\n";
25             }
26             }
27             );
28              
29             __PACKAGE__->attr(
30             protocol_redis => sub {
31             require Protocol::Redis;
32             "Protocol::Redis";
33             }
34             );
35              
36             our @COMMANDS = qw/
37             append auth bgrewriteaof bgsave blpop brpop brpoplpush config_get config_set
38             config_resetstat dbsize debug_object debug_segfault decr decrby del discard
39             echo exec exists expire expireat flushall flushdb get getbit getrange getset
40             hdel hexists hget hgetall hincrby hkeys hlen hmget hmset hset hsetnx hvals
41             incr incrby info keys lastsave lindex linsert llen lpop lpush lpushx lrange
42             lrem lset ltrim mget monitor move mset msetnx multi persist ping psubscribe
43             publish punsubscribe quit randomkey rename renamenx rpop rpoplpush rpush
44             rpushx sadd save scard sdiff sdiffstore select set setbit setex setnx
45             setrange shutdown sinter sinterstore sismember slaveof smembers smove sort
46             spop srandmember srem strlen subscribe sunion sunionstore sync ttl type
47             unsubscribe unwatch watch zadd zcard zcount zincrby zinterstore zrange
48             zrangebyscore zrank zrem zremrangebyrank zremrangebyscore zrevrange
49             zrevrangebyscore zrevrank zscore zunionstore
50             /;
51              
52             sub AUTOLOAD {
53 1     1   43 my ($package, $cmd) = our $AUTOLOAD =~ /^([\w\:]+)\:\:(\w+)$/;
54              
55             Carp::croak(qq|Can't locate object method "$cmd" via "$package"|)
56 1 50   81   14 unless List::Util::first { $_ eq $cmd } @COMMANDS;
  81         47  
57              
58 1         4 my $self = shift;
59              
60 1         2 my $args = [@_];
61 1         2 my $cb = $args->[-1];
62 1 50       4 if (ref $cb ne 'CODE') {
63 0         0 $cb = undef;
64             }
65             else {
66 1         2 pop @$args;
67             }
68              
69 1         4 $self->execute($cmd, $args, $cb);
70             }
71              
72             sub DESTROY {
73 0     0   0 my $self = shift;
74              
75             # Loop
76 0 0       0 return unless my $loop = $self->ioloop;
77              
78             # Cleanup connection
79 0 0       0 $loop->remove($self->{_connection})
80             if $self->{_connection};
81             }
82              
83             sub connect {
84 1     1 1 2 my $self = shift;
85              
86             # drop old connection
87 1 50       2 if ($self->connected) {
88 0         0 $self->ioloop->remove($self->{_connection});
89             }
90              
91 1         3 $self->server =~ m{^([^:]+)(:(\d+))?};
92 1         7 my $address = $1;
93 1   50     3 my $port = $3 || 6379;
94              
95 1         3 Scalar::Util::weaken $self;
96              
97 1         2 $self->{_protocol} = $self->_create_protocol;
98              
99             # connect
100 1         3 $self->{_connecting} = 1;
101             $self->{_connection} = $self->ioloop->client(
102             { address => $address,
103             port => $port
104             },
105             sub {
106 1     1   1358 my ($loop, $err, $stream) = @_;
107              
108              
109 1         2 delete $self->{_connecting};
110 1         3 $stream->timeout($self->timeout);
111 1         48 $self->_send_next_message;
112              
113             $stream->on(
114             read => sub {
115 4         202765 my ($stream, $chunk) = @_;
116 4         19 $self->{_protocol}->parse($chunk);
117             }
118 1         5 );
119             $stream->on(
120             close => sub {
121 1         1001982 my $str = shift;
122 1   50     12 $self->{error} ||= 'disconnected';
123 1         4 $self->_inform_queue;
124              
125 1         2 delete $self->{_message_queue};
126              
127 1         2 delete $self->{_connecting};
128 1         3 delete $self->{_connection};
129             }
130 1         7 );
131             $stream->on(
132             error => sub {
133 0         0 my ($str, $error) = @_;
134 0         0 $self->error($error);
135 0         0 $self->_inform_queue;
136              
137 0         0 $self->on_error->($self);
138 0         0 $self->ioloop->remove($self->{_connection});
139             }
140 1         5 );
141              
142             }
143 1         25 );
144              
145 1         238 return $self;
146             }
147              
148             sub connected {
149 1     1 0 2 my $self = shift;
150              
151 1         3 return $self->{_connection};
152             }
153              
154             sub execute {
155 5     5 1 2835 my ($self, $command, $args, $cb) = @_;
156              
157 5 50 33     24 if (!$cb && ref $args eq 'CODE') {
    100          
158 0         0 $cb = $args;
159 0         0 $args = [];
160             }
161             elsif (!ref $args) {
162 4         10 $args = [$args];
163             }
164              
165 5         14 unshift @$args, uc $command;
166              
167 5   100     24 my $mqueue = $self->{_message_queue} ||= [];
168 5   100     14 my $cqueue = $self->{_cb_queue} ||= [];
169              
170              
171 5         8 push @$mqueue, $args;
172 5         5 push @$cqueue, $cb;
173              
174 5 100       13 $self->connect unless $self->{_connection};
175 5         9 $self->_send_next_message;
176              
177 5         18 return $self;
178             }
179              
180             sub start {
181 1     1 1 2 my ($self) = @_;
182              
183 1         3 $self->ioloop->start;
184 1         158 return $self;
185             }
186              
187             sub stop {
188 0     0 0 0 my ($self) = @_;
189              
190 0         0 $self->ioloop->stop;
191 0         0 return $self;
192             }
193              
194             sub _create_protocol {
195 1     1   2 my $self = shift;
196              
197 1         2 my $protocol = $self->protocol_redis->new(api => 1);
198             $protocol->on_message(
199             sub {
200 3     3   116 my ($parser, $command) = @_;
201 3         9 $self->_return_command_data($command);
202             }
203 1         30 );
204              
205 1 50       6 Carp::croak(q/Protocol::Redis implementation doesn't support APIv1/)
206             unless $protocol;
207              
208 1         3 $protocol;
209             }
210              
211             sub _send_next_message {
212 6     6   9 my ($self) = @_;
213              
214 6 100 66     29 if ((my $id = $self->{_connection}) && !$self->{_connecting}) {
215 5         5 while (my $args = shift @{$self->{_message_queue}}) {
  10         185  
216 5         7 my $cmd_arg = [];
217 5         10 my $cmd = {type => '*', data => $cmd_arg};
218 5         8 foreach my $token (@$args) {
219 11 50       26 $token = Encode::encode($self->encoding, $token)
220             if $self->encoding;
221 11         552 push @$cmd_arg, {type => '$', data => $token};
222             }
223 5         14 my $message = $self->{_protocol}->encode($cmd);
224              
225 5         152 $self->ioloop->stream($id)->write($message);
226             }
227             }
228             }
229              
230              
231             sub _reencode_message {
232 3     3   3 my ($self, $message) = @_;
233              
234 3         5 my ($type, $data) = @{$message}{'type', 'data'};
  3         9  
235              
236             # Decode data
237 3 50 33     18 if ($type ne '*' && $self->encoding && $data) {
      33        
238 3         32 $data = Encode::decode($self->encoding, $data);
239             }
240              
241 3 50       158 if ($type eq '-') {
    50          
242 0         0 $self->error($data);
243 0         0 $self->on_error->($self);
244 0         0 return;
245             }
246             elsif ($type ne '*') {
247 3         8 return [$data];
248             }
249             else {
250 0         0 my $reencoded_data = [];
251 0         0 foreach my $item (@$data) {
252 0         0 my $message = $self->_reencode_message($item);
253 0         0 push @$reencoded_data, $message;
254             }
255 0         0 return $reencoded_data;
256             }
257             }
258              
259             sub _return_command_data {
260 3     3   5 my ($self, $message) = @_;
261              
262 3         10 my $data = $self->_reencode_message($message);
263              
264 3         4 my $cb = shift @{$self->{_cb_queue}};
  3         6  
265 3 50       13 $cb->($self, $data) if $cb;
266              
267             # Reset error after callback dispatching
268 3         11 $self->error(undef);
269             }
270              
271              
272             sub _inform_queue {
273 1     1   2 my ($self) = @_;
274              
275 1         2 for my $cb (@{$self->{_cb_queue}}) {
  1         4  
276 2 50       14 $cb->($self) if $cb;
277             }
278 1         31 $self->{_queue} = [];
279             }
280              
281             1;
282             __END__