File Coverage

blib/lib/Redis/Fast.pm
Criterion Covered Total %
statement 95 264 35.9
branch 20 112 17.8
condition 16 32 50.0
subroutine 20 41 48.7
pod 0 11 0.0
total 151 460 32.8


line stmt bran cond sub pod time code
1             package Redis::Fast;
2              
3             BEGIN {
4 38     38   2385510 use XSLoader;
  38         387  
  38         1487  
5 38     38   108 our $VERSION = '0.34';
6 38         22324 XSLoader::load __PACKAGE__, $VERSION;
7             }
8              
9 38     38   258 use warnings;
  38         64  
  38         1129  
10 38     38   174 use strict;
  38         77  
  38         1078  
11              
12 38     38   198 use Carp qw/confess/;
  38         59  
  38         2019  
13 38     38   19881 use Encode;
  38         339125  
  38         2824  
14 38     38   2952 use Try::Tiny;
  38         12037  
  38         1701  
15 38     38   210 use Scalar::Util qw(weaken);
  38         67  
  38         1432  
16              
17 38     38   13121 use Redis::Fast::Sentinel;
  38         95  
  38         60249  
18              
19              
20             # small utilities for handling host and port
21             sub _join_host_port {
22 0     0   0 my ($host, $port) = @_;
23 0 0 0     0 return "[$host]:$port" if $host =~ /:/ || $host =~ /%/;
24 0         0 return "$host:$port";
25             }
26             sub _split_host_port {
27 4     4   12 my $hostport = shift;
28 4 50       36 if ($hostport =~ /\A\[([^\]]+)\]:([0-9]+)\z/) {
29 0         0 return $1, $2;
30             }
31 4         20 return split /:/, $hostport;
32             }
33              
34             sub _new_on_connect_cb {
35 70     70   325 my ($self, $on_conn, $password, $name) = @_;
36 70         406 weaken $self;
37             my $handler = sub {
38             # If we are in PubSub mode we shouldn't perform any command besides
39             # (p)(un)subscribe
40 68 50   68   330 if (! $self->is_subscriber) {
41             defined $name
42             and try {
43 0         0 my $n = $name;
44 0 0       0 $n = $n->($self) if ref($n) eq 'CODE';
45 0 0       0 $self->client_setname($n) if defined $n;
46 68 50       179 };
47 68         204 my $data = $self->__get_data;
48             defined $data->{current_database}
49 68 50       488 and $self->select($data->{current_database});
50             }
51              
52 68         359 my $subscribers = $self->__get_data->{subscribers};
53 68         610 $self->__get_data->{subscribers} = {};
54 68         603 $self->__get_data->{cbs} = undef;
55 68         131 foreach my $topic (CORE::keys(%{$subscribers})) {
  68         252  
56 0 0       0 if ($topic =~ /(p?message):(.*)$/ ) {
57 0         0 my ($key, $channel) = ($1, $2);
58 0         0 my $subs = $subscribers->{$topic};
59 0 0       0 if ($key eq 'message') {
60 0         0 $self->__subscription_cmd('', 0, subscribe => $channel, $_) for @$subs;
61             } else {
62 0         0 $self->__subscription_cmd('p', 0, psubscribe => $channel, $_) for @$subs;
63             }
64             }
65             }
66              
67 68 50       406 defined $on_conn
68             and $on_conn->($self);
69 70         1947 };
70              
71             return sub {
72 68     68   563 my $reconnect_stash = $self->__get_reconnect;
73 68 50       242 if(defined $password) {
74 0         0 my $err;
75 0         0 $self->__set_reconnect(0);
76             try {
77 0         0 $self->auth($password);
78             } catch {
79 0         0 $err = $_;
80 0         0 };
81 0 0       0 if(defined $err) {
82 0 0       0 if($err =~ /ERR invalid password|WRONGPASS invalid username-password pair/) {
83             # password setting is incorrect, no need to reconnect
84 0         0 die("Redis server refused password");
85             } else {
86             # it might be network error
87             # invoke reconnect
88 0         0 $self->__set_reconnect($reconnect_stash);
89 0         0 return ;
90             }
91             }
92             }
93              
94             try {
95             # disable reconnection while executing on_connect handler
96 68         12481 $self->__set_reconnect(0);
97 68         220 $handler->();
98             } catch {
99 0         0 $self->quit();
100             } finally {
101 68         2193 $self->__set_reconnect($reconnect_stash);
102 68         3143 };
103 70         2449 };
104             }
105              
106             sub _new_reconnect_on_error_cb {
107 70     70   312 my ($self, $reconnect_on_error) = @_;
108 70         255 weaken $self;
109              
110 70 50       376 if ($reconnect_on_error) {
111             return sub {
112             # The unit should be second and the type should be double.
113             # -1 is a special value, it means that we do not reconnect.
114 0     0   0 my $next_reconnect_interval = $reconnect_on_error->(@_);
115 0 0       0 if ($next_reconnect_interval < -1) {
116 0         0 warn "reconnect_on_error must not return a number less than -1";
117              
118             # Reset a next_reconnect_interval and do not reconnect.
119 0         0 $next_reconnect_interval = -1;
120             }
121              
122             # Wait until next_reconnect_interval seconds elapse.
123 0         0 $self->__set_next_reconnect_on_error_at($next_reconnect_interval);
124              
125 0         0 my $need_reconnect = 0;
126 0 0       0 if (-1 < $next_reconnect_interval) {
127 0         0 $need_reconnect = 1;
128             }
129 0         0 return $need_reconnect;
130 0         0 };
131             } else {
132 70         272 return;
133             }
134             }
135              
136             sub new {
137 70     70 0 437894 my $class = shift;
138 70         999 my %args = @_;
139 70         6095 my $self = $class->_new;
140              
141             ## Deal with REDIS_SERVER ENV
142 70 0 33     622 if ($ENV{REDIS_SERVER} && !$args{sock} && !$args{server}) {
      33        
143 0 0       0 if ($ENV{REDIS_SERVER} =~ m!^/!) {
    0          
    0          
144 0         0 $args{sock} = $ENV{REDIS_SERVER};
145             }
146             elsif ($ENV{REDIS_SERVER} =~ m!^unix:(.+)!) {
147 0         0 $args{sock} = $1;
148             }
149             elsif ($ENV{REDIS_SERVER} =~ m!^(tcp:)?(.+)!) {
150 0         0 $args{server} = $2;
151             }
152             }
153              
154 70         743 my $on_conn = $args{on_connect};
155 70         195 my $password = $args{password};
156 70         419 my $name = $args{name};
157 70         477 $self->__set_on_connect($self->_new_on_connect_cb($on_conn, $password, $name));
158             $self->__set_data({
159             subscribers => {},
160             sentinels_cnx_timeout => $args{sentinels_cnx_timeout},
161             sentinels_read_timeout => $args{sentinels_read_timeout},
162             sentinels_write_timeout => $args{sentinels_write_timeout},
163             no_sentinels_list_update => $args{no_sentinels_list_update},
164 70         1877 });
165              
166 70 100       343 if ($args{sock}) {
    50          
167 66         747 $self->__connection_info_unix($args{sock});
168             } elsif ($args{sentinels}) {
169 0         0 my $sentinels = $args{sentinels};
170 0 0       0 ref $sentinels eq 'ARRAY'
171             or croak("'sentinels' param must be an ArrayRef");
172             defined($self->__get_data->{service} = $args{service})
173 0 0       0 or croak("Need 'service' name when using 'sentinels'!");
174 0         0 $self->__get_data->{sentinels} = $sentinels;
175             my $on_build_sock = sub {
176 0     0   0 my $data = $self->__get_data;
177 0         0 my $sentinels = $data->{sentinels};
178              
179             # try to connect to a sentinel
180 0         0 my $status;
181 0         0 foreach my $sentinel_address (@$sentinels) {
182 0 0       0 my $sentinel = eval {
183             Redis::Fast::Sentinel->new(
184             server => $sentinel_address,
185             cnx_timeout => ( exists $data->{sentinels_cnx_timeout}
186             ? $data->{sentinels_cnx_timeout} : 0.1),
187             read_timeout => ( exists $data->{sentinels_read_timeout}
188             ? $data->{sentinels_read_timeout} : 1 ),
189             write_timeout => ( exists $data->{sentinels_write_timeout}
190 0 0       0 ? $data->{sentinels_write_timeout} : 1 ),
    0          
    0          
191             )
192             } or next;
193 0         0 my $server_address = $sentinel->get_service_address($data->{service});
194 0 0 0     0 defined $server_address
195             or $status ||= "Sentinels don't know this service",
196             next;
197 0 0       0 $server_address eq 'IDONTKNOW'
198             and $status = "service is configured in one Sentinel, but was never reached",
199             next;
200              
201             # we found the service, set the server
202 0         0 my ($server, $port) = _split_host_port $server_address;
203 0         0 $self->__connection_info($server, $port);
204              
205 0 0       0 if (! $data->{no_sentinels_list_update} ) {
206             # move the elected sentinel at the front of the list and add
207             # additional sentinels
208 0         0 my $idx = 2;
209 0         0 my %h = ( ( map { $_ => $idx++ } @{$data->{sentinels}}),
  0         0  
  0         0  
210             $sentinel_address => 1,
211             );
212              
213             $data->{sentinels} = [
214 0         0 ( sort { $h{$a} <=> $h{$b} } CORE::keys(%h) ), # sorted existing sentinels,
215 0         0 grep { ! $h{$_}; } # list of unknown
216             map {
217 0         0 my $s = +{ @$_ };
218 0         0 _join_host_port($s->{ip}, $s->{port});
219             } # ip:port of
220             $sentinel->sentinel( # sentinels
221             sentinels => $data->{service} # for this service
222             )
223 0         0 ];
224             }
225             }
226 0         0 };
227 0         0 $self->__set_on_build_sock($on_build_sock);
228             } else {
229 4   50     30 my ($server, $port) = _split_host_port($args{server} || '127.0.0.1:6379');
230 4         28 $self->__connection_info($server, $port);
231             }
232              
233             #$self->{is_subscriber} = 0;
234             #$self->{subscribers} = {};
235 70   100     478 $self->__set_reconnect($args{reconnect} || 0);
236 70   100     1132 $self->__set_every($args{every} || 1000);
237 70 50 33     578 $self->__set_debug(($args{debug} || $ENV{REDIS_DEBUG}) ? 1 : 0);
238 70   100     899 $self->__set_cnx_timeout($args{cnx_timeout} || -1);
239 70   100     528 $self->__set_read_timeout($args{read_timeout} || -1);
240 70   100     372 $self->__set_write_timeout($args{write_timeout} || -1);
241              
242 70 50       777 if (my $cb = $self->_new_reconnect_on_error_cb($args{reconnect_on_error})) {
243 0         0 $self->__set_reconnect_on_error($cb);
244             }
245              
246 70 100       376630 $self->connect unless $args{no_auto_connect_on_new};
247              
248 69         2686 return $self;
249             }
250              
251              
252              
253             ### Deal with common, general case, Redis commands
254             our $AUTOLOAD;
255              
256             sub AUTOLOAD {
257 13     13   3134 my $command = $AUTOLOAD;
258 13         245 $command =~ s/.*://;
259 13         65 my @command = split /_/, uc $command;
260              
261             my $method = sub {
262 68     68   1779 my $self = shift;
263 68         843 $self->__is_valid_command($command);
264 68         1286691 my ($ret, $error) = $self->__std_cmd(@command, @_);
265 68 100       8309 confess "[$command] $error, " if defined $error;
266 54 100 66     1679 return (wantarray && ref $ret eq 'ARRAY') ? @$ret : $ret;
267 13         237 };
268              
269             # Save this method for future calls
270 38     38   433 no strict 'refs';
  38         92  
  38         47822  
271 13         262 *$AUTOLOAD = $method;
272              
273 13         136 goto $method;
274             }
275              
276             sub __with_reconnect {
277 0     0   0 my ($self, $cb) = @_;
278              
279 0         0 confess "not implemented";
280             }
281              
282              
283             ### Commands with extra logic
284              
285             sub keys {
286 0     0 0 0 my $self = shift;
287 0         0 $self->__is_valid_command('keys');
288 0         0 my ($ret, $error) = $self->__keys(@_);
289 0 0       0 confess "[keys] $error, " if defined $error;
290 0 0       0 return $ret unless ref $ret eq 'ARRAY';
291 0         0 return @$ret;
292             }
293              
294             sub ping {
295 0     0 0 0 my $self = shift;
296 0         0 $self->__is_valid_command('ping');
297 0 0       0 return unless $self->__sock;
298             return scalar try {
299 0     0   0 my ($ret, $error) = $self->__std_cmd('ping');
300 0 0       0 return if defined $error;
301 0         0 return $ret;
302             } catch {
303 0     0   0 return ;
304 0         0 };
305             }
306              
307             sub info {
308 3     3 0 105 my $self = shift;
309 3         7 $self->__is_valid_command('info');
310 3         10 my ($ret, $error) = $self->__info(@_);
311 3 50       329 confess "[info] $error, " if defined $error;
312 0 0       0 return $ret unless ref $ret eq 'ARRAY';
313 0         0 return @$ret;
314             }
315              
316             sub quit {
317 0     0 0 0 my $self = shift;
318 0         0 $self->__is_valid_command('quit');
319 0         0 $self->__quit(@_);
320             }
321              
322             sub shutdown {
323 0     0 0 0 my $self = shift;
324 0         0 $self->__is_valid_command('shutdown');
325 0         0 $self->__shutdown(@_);
326             }
327              
328             sub select {
329 0     0 0 0 my $self = shift;
330 0         0 my $database = shift;
331 0         0 $self->__is_valid_command('select');
332 0         0 my ($ret, $error) = $self->__std_cmd('SELECT', $database, @_);
333 0 0       0 confess "[SELECT] $error, " if defined $error;
334 0         0 $self->__get_data->{current_database} = $database;
335 0         0 return $ret;
336             }
337              
338             sub __subscription_cmd {
339 0     0   0 my $self = shift;
340 0         0 my $pr = shift;
341 0         0 my $unsub = shift;
342 0         0 my $command = shift;
343 0         0 my $cb = pop;
344 0         0 weaken $self;
345              
346 0 0       0 confess("Missing required callback in call to $command(), ")
347             unless ref($cb) eq 'CODE';
348              
349 0         0 $self->wait_all_responses;
350              
351 0         0 while($self->__get_data->{cbs}) {
352 0         0 $self->__wait_for_event(1);
353             }
354              
355 0         0 my @subs = @_;
356 0 0       0 @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
357             if $unsub;
358              
359 0 0       0 if(@subs) {
360 0         0 $self->__get_data->{cbs} = { map { ("${pr}message:$_" => $cb) } @subs };
  0         0  
361 0         0 for my $sub(@subs) {
362 0         0 $self->__send_subscription_cmd(
363             $command,
364             $sub,
365             $self->__subscription_callbak,
366             );
367             }
368 0         0 while($self->__get_data->{cbs}) {
369 0         0 $self->__wait_for_event(1);
370             }
371             }
372             }
373              
374             sub __subscription_callbak {
375 0     0   0 my $self = shift;
376 0         0 my $cb = $self->__get_data->{callback};
377 0 0       0 return $cb if $cb;
378              
379 0         0 weaken $self;
380             $cb = sub {
381 0     0   0 my $cbs = $self->__get_data->{cbs};
382 0 0       0 if($cbs) {
383 0         0 $self->__process_subscription_changes($cbs, @_);
384 0 0       0 unless(%$cbs) {
385 0         0 $self->__get_data->{cbs} = undef;
386             }
387             } else {
388 0         0 $self->__process_pubsub_msg(@_);
389             }
390 0         0 };
391              
392 0         0 $self->__get_data->{callback} = $cb;
393 0         0 return $cb;
394             }
395              
396 0     0 0 0 sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) }
397 0     0 0 0 sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) }
398 0     0 0 0 sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) }
399 0     0 0 0 sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
400              
401             sub __process_unsubscribe_requests {
402 0     0   0 my ($self, $cb, $pr, @unsubs) = @_;
403 0         0 my $subs = $self->__get_data->{subscribers};
404              
405 0         0 my @subs_to_unsubscribe;
406 0         0 for my $sub (@unsubs) {
407 0         0 my $key = "${pr}message:$sub";
408 0 0 0     0 next unless $subs->{$key} && @{ $subs->{$key} };
  0         0  
409 0         0 my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{ $subs->{$key} }];
  0         0  
  0         0  
410 0 0       0 next if @$cbs;
411              
412 0         0 delete $subs->{$key};
413 0         0 push @subs_to_unsubscribe, $sub;
414             }
415 0         0 return @subs_to_unsubscribe;
416             }
417              
418             sub __process_subscription_changes {
419 0     0   0 my ($self, $expected, $m, $error) = @_;
420 0         0 my $subs = $self->__get_data->{subscribers};
421              
422             ## Deal with pending PUBLISH'ed messages
423 0 0       0 if ($m->[0] =~ /^p?message$/) {
424 0         0 $self->__process_pubsub_msg($m);
425 0         0 return ;
426             }
427              
428 0         0 my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
429 0         0 $key .= "message:$m->[1]";
430 0         0 my $cb = delete $expected->{$key};
431              
432 0 0       0 push @{ $subs->{$key} }, $cb unless $unsub;
  0         0  
433             }
434              
435             sub __process_pubsub_msg {
436 0     0   0 my ($self, $m) = @_;
437 0         0 my $subs = $self->__get_data->{subscribers};
438              
439 0         0 my $sub = $m->[1];
440 0         0 my $cbid = "$m->[0]:$sub";
441 0         0 my $data = pop @$m;
442 0 0       0 my $topic = defined $m->[2] ? $m->[2] : $sub;
443              
444 0 0       0 if (!exists $subs->{$cbid}) {
445 0         0 warn "Message for topic '$topic' ($cbid) without expected callback, ";
446 0         0 return 0;
447             }
448              
449 0         0 $_->($data, $topic, $sub) for @{ $subs->{$cbid} };
  0         0  
450              
451 0         0 return 1;
452              
453             }
454              
455             sub __is_valid_command {
456 71     71   258 my ($self, $cmd) = @_;
457              
458 71 50       396 confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
459             if $self->is_subscriber;
460             }
461              
462             1; # End of Redis.pm
463              
464             __END__