File Coverage

blib/lib/Mojo/Redis/Connection.pm
Criterion Covered Total %
statement 170 178 95.5
branch 55 80 68.7
condition 29 45 64.4
subroutine 26 28 92.8
pod 4 4 100.0
total 284 335 84.7


line stmt bran cond sub pod time code
1             package Mojo::Redis::Connection;
2 18     18   146 use Mojo::Base 'Mojo::EventEmitter';
  18         45  
  18         113  
3              
4 18     18   12694 use File::Spec::Functions 'file_name_is_absolute';
  18         16724  
  18         1291  
5 18     18   9531 use Mojo::IOLoop;
  18         3242361  
  18         132  
6 18     18   1113 use Mojo::Promise;
  18         44  
  18         104  
7              
8 18     18   795 use constant DEBUG => $ENV{MOJO_REDIS_DEBUG};
  18         44  
  18         1451  
9 18   50 18   116 use constant CONNECT_TIMEOUT => $ENV{MOJO_REDIS_CONNECT_TIMEOUT} || 10;
  18         38  
  18         1235  
10 18   50 18   333 use constant SENTINELS_CONNECT_TIMEOUT => $ENV{MOJO_REDIS_SENTINELS_CONNECT_TIMEOUT} || CONNECT_TIMEOUT;
  18         41  
  18         59293  
11              
12             has encoding => sub { Carp::confess('encoding is required in constructor') };
13             has ioloop => sub { Carp::confess('ioloop is required in constructor') };
14             has protocol => sub { Carp::confess('protocol is required in constructor') };
15             has url => sub { Carp::confess('url is required in constructor') };
16              
17             sub DESTROY {
18 16     16   4363 my $self = shift;
19 16 100 66     544 $self->disconnect if defined $self->{pid} and $self->{pid} == $$;
20             }
21              
22             sub disconnect {
23 5     5 1 10 my $self = shift;
24 5         18 $self->_reject_queue;
25 5 100       32 $self->{stream}->close if $self->{stream};
26 5         185 $self->{gone_away} = 1;
27 5         275 return $self;
28             }
29              
30 25 100 100 25 1 172 sub is_connected { $_[0]->{stream} && !$_[0]->{gone_away} ? 1 : 0 }
31              
32             sub write {
33 1     1 1 2 my $self = shift;
34 1         20 push @{$self->{write}}, [$self->_encode(@_)];
  1         10  
35 1 50       68 $self->is_connected ? $self->_write : $self->_connect;
36 1         4 return $self;
37             }
38              
39             sub write_p {
40 8     8 1 15 my $self = shift;
41 8         41 my $p = Mojo::Promise->new->ioloop($self->ioloop);
42 8         346 push @{$self->{write}}, [$self->_encode(@_), $p];
  8         33  
43 8 100       365 $self->is_connected ? $self->_write : $self->_connect;
44 8         42 return $p;
45             }
46              
47             sub _connect {
48 11     11   23 my $self = shift;
49 11 50       35 return $self if $self->{id}; # Connecting
50              
51             # Cannot reuse a connection because of transaction state and other state
52 11 100       37 return $self->_reject_queue('Redis server has gone away') if $self->{gone_away};
53              
54 9   66     56 my $url = $self->{master_url} || $self->url;
55 9 100 100     129 return $self->_discover_master if !$self->{master_url} and $url->query->param('sentinel');
56              
57 8         560 Scalar::Util::weaken($self);
58 8         26 delete $self->{master_url}; # Make sure we forget master_url so we can reconnect
59 8         35 $self->protocol->on_message($self->_parse_message_cb);
60             $self->{id} = $self->ioloop->client(
61             $self->_connect_args($url, {port => 6379, timeout => CONNECT_TIMEOUT}),
62             sub {
63 6 50   6   11708 return unless $self;
64 6         19 my ($loop, $err, $stream) = @_;
65 6         20 my $close_cb = $self->_on_close_cb;
66 6 100       18 return $self->$close_cb($err) if $err;
67              
68 5         21 $stream->timeout(0);
69 5         147 $stream->on(close => $close_cb);
70 5         34 $stream->on(error => $close_cb);
71 5         37 $stream->on(read => $self->_on_read_cb);
72              
73 5 100       40 unshift @{$self->{write}}, [$self->_encode(SELECT => $url->path->[0])] if length $url->path->[0];
  2         211  
74 5 100       195 unshift @{$self->{write}}, [$self->_encode(AUTH => $url->password)] if length $url->password;
  2         46  
75 5         117 $self->{pid} = $$;
76 5         18 $self->{stream} = $stream;
77 5         95 $self->emit('connect');
78 5         152 $self->_write;
79             }
80 8         72 );
81              
82 8         1784 warn "[@{[$self->_id]}] CONNECTING $url (blocking=@{[$self->_is_blocking]})\n" if DEBUG;
83 8         28 return $self;
84             }
85              
86             sub _connect_args {
87 10     10   90 my ($self, $url, $defaults) = @_;
88 10   50     33 my %args = (address => $url->host || 'localhost');
89              
90 10 100       99 if (file_name_is_absolute $args{address}) {
91 1         12 $args{path} = delete $args{address};
92             }
93             else {
94 9   66     92 $args{port} = $url->port || $defaults->{port};
95             }
96              
97 10   50     98 $args{timeout} = $defaults->{timeout} || CONNECT_TIMEOUT;
98 10         98 return \%args;
99             }
100              
101             sub _discover_master {
102 2     2   213 my $self = shift;
103 2         9 my $url = $self->url->clone;
104 2         156 my $sentinels = $url->query->every_param('sentinel');
105 2   50     54 my $timeout = $url->query->param('sentinel_connect_timeout') || SENTINELS_CONNECT_TIMEOUT;
106              
107 2         63 $url->host_port(shift @$sentinels);
108 2         83 $self->url->query->param(sentinel => [@$sentinels, $url->host_port]); # Round-robin sentinel list
109 2         188 $self->protocol->on_message($self->_parse_message_cb);
110             $self->{id} = $self->ioloop->client(
111             $self->_connect_args($url, {port => 16379, timeout => $timeout}),
112             sub {
113 2     2   4216 my ($loop, $err, $stream) = @_;
114 2 50       7 return unless $self;
115 2 50       6 return $self->_discover_master if $err;
116              
117 2         7 $stream->timeout(0);
118 2 50       60 $stream->on(close => sub { $self->_discover_master unless $self->{master_url} });
  1         92  
119 2         18 $stream->on(error => sub { $self->_discover_master });
  0         0  
120 2         16 $stream->on(read => $self->_on_read_cb);
121              
122 2         15 $self->{stream} = $stream;
123 2         13 my $p = Mojo::Promise->new;
124 2         54 unshift @{$self->{write}}, undef; # prevent _write() from writing commands
  2         6  
125 2         3 unshift @{$self->{write}}, [$self->_encode(SENTINEL => 'get-master-addr-by-name', $self->url->host), $p];
  2         8  
126 2 50       100 unshift @{$self->{write}}, [$self->_encode(AUTH => $url->password)] if length $url->password;
  2         42  
127              
128 2         78 $self->{write_lock} = 1;
129             $p->then(
130             sub {
131 2         443 my $host_port = shift;
132 2         7 delete $self->{id};
133 2         6 delete $self->{write_lock};
134 2 100 66     13 return $self->_discover_master unless ref $host_port and @$host_port == 2;
135 1         7 $self->{master_url} = $self->url->clone->host($host_port->[0])->port($host_port->[1]);
136 1         131 $self->{stream}->close;
137 1         72 $self->_connect;
138             },
139 0         0 sub { $self->_discover_master },
140 2         16 );
141              
142 2         131 $self->_write;
143             }
144 2         18 );
145              
146 2         529 warn "[@{[$self->_id]}] SENTINEL DISCOVERY $url (blocking=@{[$self->_is_blocking]})\n" if DEBUG;
147 2         7 return $self;
148             }
149              
150             sub _encode {
151 17     17   115 my $self = shift;
152 17         81 my $encoding = $self->encoding;
153             return $self->protocol->encode({
154 17 50       103 type => '*', data => [map { +{type => '$', data => $encoding ? Mojo::Util::encode($encoding, $_) : $_} } @_]
  34         333  
155             });
156             }
157              
158 0 0   0   0 sub _id { $_[0]->{id} || '0' }
159              
160 0 0   0   0 sub _is_blocking { shift->ioloop eq Mojo::IOLoop->singleton ? 0 : 1 }
161              
162             sub _on_close_cb {
163 6     6   13 my $self = shift;
164              
165 6         17 Scalar::Util::weaken($self);
166             return sub {
167 5 50   5   2461 return unless $self;
168 5         13 my ($stream, $err) = @_;
169 5         20 delete $self->{$_} for qw(id stream);
170 5         11 $self->{gone_away} = 1;
171 5         18 $self->_reject_queue($err);
172 5 100       31 $self->emit('close') if @_ == 1;
173 5 0 50     61 warn qq([@{[$self->_id]}] @{[$err ? "ERROR $err" : "CLOSED"]}\n) if $self and DEBUG;
  0 50       0  
  0         0  
174 6         33 };
175             }
176              
177             sub _on_read_cb {
178 7     7   16 my $self = shift;
179              
180 7         44 Scalar::Util::weaken($self);
181             return sub {
182 4 50   4   5705 return unless $self;
183 4         14 my ($stream, $chunk) = @_;
184 4         9 do { local $_ = $chunk; s!\r\n!\\r\\n!g; warn "[@{[$self->_id]}] >>> ($_)\n" } if DEBUG;
185 4         23 $self->protocol->parse($chunk);
186 7         69 };
187             }
188              
189             sub _parse_message_cb {
190 10     10   60 my $self = shift;
191              
192 10         44 Scalar::Util::weaken($self);
193             return sub {
194 8     8   466 my ($protocol, @messages) = @_;
195 8         23 my $encoding = $self->encoding;
196 8 100       54 $self->_write unless $self->{write_lock};
197              
198             my $unpack = sub {
199 10         17 my @res;
200              
201 10         28 while (my $m = shift @_) {
202 12 50 66     153 if ($m->{type} eq '-') {
    50 66        
    100 66        
    100          
203 0         0 return $m->{data}, undef;
204             }
205             elsif ($m->{type} eq ':') {
206 0         0 push @res, 0 + $m->{data};
207             }
208             elsif ($m->{type} eq '*' and ref $m->{data} eq 'ARRAY') {
209 2         4 my ($err, $res) = __SUB__->(@{$m->{data}});
  2         11  
210 2 50       29 return $err if defined $err;
211 2         10 push @res, $res;
212             }
213              
214             # Only bulk string replies can contain binary-safe encoded data
215             elsif ($m->{type} eq '$' and $encoding and defined $m->{data}) {
216 9         35 push @res, Mojo::Util::decode($encoding, $m->{data});
217             }
218             else {
219 1         4 push @res, $m->{data};
220             }
221             }
222              
223 10         161 return undef, \@res;
224 8         47 };
225              
226 8         25 my ($err, $res) = $unpack->(@messages);
227 8 50       16 my $p = shift @{$self->{waiting} || []};
  8         22  
228 8 0       20 return $p ? $p->reject($err) : $self->emit(error => $err) unless $res;
    50          
229 8 100       56 return $p ? $p->resolve($res->[0]) : $self->emit(response => $res->[0]);
230 10         85 };
231             }
232              
233             sub _reject_queue {
234 12     12   25 my ($self, $err) = @_;
235 12         23 state $default = 'Premature connection close';
236 12 100 33     18 for my $p (@{delete $self->{waiting} || []}) { $p and $p->reject($err || $default) }
  12 100       54  
  4         20  
237 12 100 66     271 for my $i (@{delete $self->{write} || []}) { $i->[1] and $i->[1]->reject($err || $default) }
  12 50       51  
  4         25  
238 12         461 return $self;
239             }
240              
241             sub _write {
242 12     12   19 my $self = shift;
243              
244 12         32 while (my $op = shift @{$self->{write}}) {
  24         401  
245 12         32 my $loop = $self->ioloop;
246 12         59 do { local $_ = $op->[0]; s!\r\n!\\r\\n!g; warn "[@{[$self->_id]}] <<< ($_)\n" } if DEBUG;
247 12         21 push @{$self->{waiting}}, $op->[1];
  12         32  
248 12         40 $self->{stream}->write($op->[0]);
249             }
250             }
251              
252             1;
253              
254             =encoding utf8
255              
256             =head1 NAME
257              
258             Mojo::Redis::Connection - Low level connection class for talking to Redis
259              
260             =head1 SYNOPSIS
261              
262             use Mojo::Redis::Connection;
263              
264             my $conn = Mojo::Redis::Connection->new(
265             ioloop => Mojo::IOLoop->singleton,
266             protocol => Protocol::Redis::Faster->new(api => 1),
267             url => Mojo::URL->new("redis://localhost"),
268             );
269              
270             $conn->write_p("GET some_key")->then(sub { print "some_key=$_[0]" })->wait;
271              
272             =head1 DESCRIPTION
273              
274             L is a low level driver for writing and reading data
275             from a Redis server.
276              
277             You probably want to use L instead of this class.
278              
279             =head1 EVENTS
280              
281             =head2 close
282              
283             $cb = $conn->on(close => sub { my ($conn) = @_; });
284              
285             Emitted when the connection to the redis server gets closed.
286              
287             =head2 connect
288              
289             $cb = $conn->on(connect => sub { my ($conn) = @_; });
290              
291             Emitted right after a connection is established to the Redis server, but
292             after the AUTH and SELECT commands are queued.
293              
294             =head2 error
295              
296             $cb = $conn->on(error => sub { my ($conn, $error) = @_; });
297              
298             Emitted if there's a connection error or the Redis server emits an error, and
299             there's not a promise to handle the message.
300              
301             =head2 response
302              
303             $cb = $conn->on(response => sub { my ($conn, $res) = @_; });
304              
305             Emitted when receiving a message from the Redis server.
306              
307             =head1 ATTRIBUTES
308              
309             =head2 encoding
310              
311             $str = $conn->encoding;
312             $conn = $conn->encoding("UTF-8");
313              
314             Holds the character encoding to use for data from/to Redis. Set to C
315             to disable encoding/decoding data. Without an encoding set, Redis expects and
316             returns bytes. See also L.
317              
318             =head2 ioloop
319              
320             $loop = $conn->ioloop;
321             $conn = $conn->ioloop(Mojo::IOLoop->new);
322              
323             Holds an instance of L.
324              
325             =head2 protocol
326              
327             $protocol = $conn->protocol;
328             $conn = $conn->protocol(Protocol::Redis::XS->new(api => 1));
329              
330             Holds a protocol object, such as L that is used to
331             generate and parse Redis messages.
332              
333             =head2 url
334              
335             $url = $conn->url;
336             $conn = $conn->url(Mojo::URL->new->host("/tmp/redis.sock")->path("/5"));
337             $conn = $conn->url("redis://localhost:6379/1");
338              
339             =head1 METHODS
340              
341             =head2 disconnect
342              
343             $conn = $conn->disconnect;
344              
345             Used to disconnect from the Redis server.
346              
347             =head2 is_connected
348              
349             $bool = $conn->is_connected;
350              
351             True if a connection to the Redis server is established.
352              
353             =head2 write
354              
355             $conn = $conn->write(@command_and_args);
356              
357             Used to write a message to the redis server. Calling this method should result
358             in either a L or L event.
359              
360             This is useful in the a
361              
362             =head2 write_p
363              
364             $promise = $conn->write_p(@command_and_args);
365              
366             Will write a command to the Redis server and establish a connection if not
367             already connected and returns a L.
368              
369             =head1 SEE ALSO
370              
371             L
372              
373             =cut