File Coverage

blib/lib/MojoX/Redis.pm
Criterion Covered Total %
statement 108 135 80.0
branch 18 34 52.9
condition 11 20 55.0
subroutine 20 22 90.9
pod 3 5 60.0
total 160 216 74.0


line stmt bran cond sub pod time code
1             package MojoX::Redis;
2              
3 2     2   125273 use strict;
  2         3  
  2         49  
4 2     2   7 use warnings;
  2         2  
  2         61  
5              
6             our $VERSION = 0.88;
7 2     2   6 use base 'Mojo::Base';
  2         5  
  2         130  
8              
9 2     2   7 use Mojo::IOLoop;
  2         2  
  2         13  
10 2     2   31 use List::Util ();
  2         1  
  2         16  
11 2     2   6 use Scalar::Util ();
  2         1  
  2         19  
12 2     2   6 use Encode ();
  2         2  
  2         2741  
13             require Carp;
14              
15             __PACKAGE__->attr(server => '127.0.0.1:6379');
16             __PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->singleton });
17             __PACKAGE__->attr(error => undef);
18             __PACKAGE__->attr(timeout => 300);
19             __PACKAGE__->attr(encoding => 'UTF-8');
20             __PACKAGE__->attr(
21             on_error => sub {
22             sub {
23             my $redis = shift;
24             warn "Redis error: ", $redis->error, "\n";
25             }
26             }
27             );
28              
29             __PACKAGE__->attr(
30             protocol_redis => sub {
31             require Protocol::Redis;
32             "Protocol::Redis";
33             }
34             );
35              
36             our @COMMANDS = qw/
37             append auth bgrewriteaof bgsave blpop brpop brpoplpush config_get config_set
38             config_resetstat dbsize debug_object debug_segfault decr decrby del discard
39             echo exec exists expire expireat flushall flushdb get getbit getrange getset
40             hdel hexists hget hgetall hincrby hkeys hlen hmget hmset hset hsetnx hvals
41             incr incrby info keys lastsave lindex linsert llen lpop lpush lpushx lrange
42             lrem lset ltrim mget monitor move mset msetnx multi persist ping psubscribe
43             publish punsubscribe quit randomkey rename renamenx rpop rpoplpush rpush
44             rpushx sadd save scard sdiff sdiffstore select set setbit setex setnx
45             setrange shutdown sinter sinterstore sismember slaveof smembers smove sort
46             spop srandmember srem strlen subscribe sunion sunionstore sync ttl type
47             unsubscribe unwatch watch zadd zcard zcount zincrby zinterstore zrange
48             zrangebyscore zrank zrem zremrangebyrank zremrangebyscore zrevrange
49             zrevrangebyscore zrevrank zscore zunionstore
50             /;
51              
52             sub AUTOLOAD {
53 1     1   53 my ($package, $cmd) = our $AUTOLOAD =~ /^([\w\:]+)\:\:(\w+)$/;
54              
55             Carp::croak(qq|Can't locate object method "$cmd" via "$package"|)
56 1 50   81   15 unless List::Util::first { $_ eq $cmd } @COMMANDS;
  81         47  
57              
58 1         3 my $self = shift;
59              
60 1         3 my $args = [@_];
61 1         2 my $cb = $args->[-1];
62 1 50       5 if (ref $cb ne 'CODE') {
63 0         0 $cb = undef;
64             }
65             else {
66 1         3 pop @$args;
67             }
68              
69 1         5 $self->execute($cmd, $args, $cb);
70             }
71              
72             sub DESTROY {
73 0     0   0 my $self = shift;
74              
75             # Loop
76 0 0       0 return unless my $loop = $self->ioloop;
77              
78             # Cleanup connection
79 0 0       0 $loop->remove($self->{_connection})
80             if $self->{_connection};
81             }
82              
83             sub connect {
84 1     1 1 1 my $self = shift;
85              
86             # drop old connection
87 1 50       1 if ($self->connected) {
88 0         0 $self->ioloop->remove($self->{_connection});
89             }
90              
91 1         3 $self->server =~ m{^([^:]+)(:(\d+))?};
92 1         7 my $address = $1;
93 1   50     3 my $port = $3 || 6379;
94              
95 1         2 Scalar::Util::weaken $self;
96              
97 1         1 $self->{_protocol} = $self->_create_protocol;
98              
99             # connect
100 1         1 $self->{_connecting} = 1;
101             $self->{_connection} = $self->ioloop->client(
102             { address => $address,
103             port => $port
104             },
105             sub {
106 1     1   937 my ($loop, $err, $stream) = @_;
107              
108 1 50       2 if ($err) {
109 0         0 $self->error($err);
110 0         0 $self->_inform_queue;
111              
112 0         0 $self->on_error->($self);
113 0         0 return;
114             }
115              
116 1         2 delete $self->{_connecting};
117 1         2 $stream->timeout($self->timeout);
118 1         39 $self->_send_next_message;
119              
120             $stream->on(
121             read => sub {
122 4         202776 my ($stream, $chunk) = @_;
123 4         20 $self->{_protocol}->parse($chunk);
124             }
125 1         4 );
126             $stream->on(
127             close => sub {
128 1         1000730 my $str = shift;
129 1   50     20 $self->{error} ||= 'disconnected';
130 1         4 $self->_inform_queue;
131              
132 1         3 delete $self->{_message_queue};
133              
134 1         2 delete $self->{_connecting};
135 1         3 delete $self->{_connection};
136             }
137 1         6 );
138             $stream->on(
139             error => sub {
140 0         0 my ($str, $error) = @_;
141 0         0 $self->error($error);
142 0         0 $self->_inform_queue;
143              
144 0         0 $self->on_error->($self);
145 0         0 $self->ioloop->remove($self->{_connection});
146             }
147 1         5 );
148              
149             }
150 1         13 );
151              
152 1         140 return $self;
153             }
154              
155             sub connected {
156 1     1 0 1 my $self = shift;
157              
158 1         2 return $self->{_connection};
159             }
160              
161             sub execute {
162 5     5 1 2300 my ($self, $command, $args, $cb) = @_;
163              
164 5 50 33     19 if (!$cb && ref $args eq 'CODE') {
    100          
165 0         0 $cb = $args;
166 0         0 $args = [];
167             }
168             elsif (!ref $args) {
169 4         5 $args = [$args];
170             }
171              
172 5         13 unshift @$args, uc $command;
173              
174 5   100     17 my $mqueue = $self->{_message_queue} ||= [];
175 5   100     12 my $cqueue = $self->{_cb_queue} ||= [];
176              
177              
178 5         6 push @$mqueue, $args;
179 5         5 push @$cqueue, $cb;
180              
181 5 100       11 $self->connect unless $self->{_connection};
182 5         9 $self->_send_next_message;
183              
184 5         16 return $self;
185             }
186              
187             sub start {
188 1     1 1 1 my ($self) = @_;
189              
190 1         3 $self->ioloop->start;
191 1         168 return $self;
192             }
193              
194             sub stop {
195 0     0 0 0 my ($self) = @_;
196              
197 0         0 $self->ioloop->stop;
198 0         0 return $self;
199             }
200              
201             sub _create_protocol {
202 1     1   1 my $self = shift;
203              
204 1         3 my $protocol = $self->protocol_redis->new(api => 1);
205             $protocol->on_message(
206             sub {
207 3     3   120 my ($parser, $command) = @_;
208 3         12 $self->_return_command_data($command);
209             }
210 1         18 );
211              
212 1 50       4 Carp::croak(q/Protocol::Redis implementation doesn't support APIv1/)
213             unless $protocol;
214              
215 1         2 $protocol;
216             }
217              
218             sub _send_next_message {
219 6     6   6 my ($self) = @_;
220              
221 6 100 66     26 if ((my $id = $self->{_connection}) && !$self->{_connecting}) {
222 5         7 while (my $args = shift @{$self->{_message_queue}}) {
  10         161  
223 5         4 my $cmd_arg = [];
224 5         10 my $cmd = {type => '*', data => $cmd_arg};
225 5         7 foreach my $token (@$args) {
226 11 50       20 $token = Encode::encode($self->encoding, $token)
227             if $self->encoding;
228 11         394 push @$cmd_arg, {type => '$', data => $token};
229             }
230 5         13 my $message = $self->{_protocol}->encode($cmd);
231              
232 5         110 $self->ioloop->stream($id)->write($message);
233             }
234             }
235             }
236              
237              
238             sub _reencode_message {
239 3     3   4 my ($self, $message) = @_;
240              
241 3         5 my ($type, $data) = @{$message}{'type', 'data'};
  3         9  
242              
243             # Decode data
244 3 50 33     19 if ($type ne '*' && $self->encoding && $data) {
      33        
245 3         37 $data = Encode::decode($self->encoding, $data);
246             }
247              
248 3 50       179 if ($type eq '-') {
    50          
249 0         0 $self->error($data);
250 0         0 $self->on_error->($self);
251 0         0 return;
252             }
253             elsif ($type ne '*') {
254 3         8 return [$data];
255             }
256             else {
257 0         0 my $reencoded_data = [];
258 0         0 foreach my $item (@$data) {
259 0         0 my $message = $self->_reencode_message($item);
260 0         0 push @$reencoded_data, $message;
261             }
262 0         0 return $reencoded_data;
263             }
264             }
265              
266             sub _return_command_data {
267 3     3   4 my ($self, $message) = @_;
268              
269 3         8 my $data = $self->_reencode_message($message);
270              
271 3         6 my $cb = shift @{$self->{_cb_queue}};
  3         7  
272 3 50       12 $cb->($self, $data) if $cb;
273              
274             # Reset error after callback dispatching
275 3         13 $self->error(undef);
276             }
277              
278              
279             sub _inform_queue {
280 1     1   3 my ($self) = @_;
281              
282 1         2 for my $cb (@{$self->{_cb_queue}}) {
  1         5  
283 2 50       14 $cb->($self) if $cb;
284             }
285 1         33 $self->{_queue} = [];
286             }
287              
288             1;
289             __END__