File Coverage

blib/lib/Redis/ClusterRider.pm
Criterion Covered Total %
statement 277 292 94.8
branch 87 110 79.0
condition 12 18 66.6
subroutine 32 33 96.9
pod 6 6 100.0
total 414 459 90.2


line stmt bran cond sub pod time code
1             package Redis::ClusterRider;
2              
3 5     5   352003 use 5.008000;
  5         41  
4 5     5   27 use strict;
  5         8  
  5         131  
5 5     5   24 use warnings;
  5         9  
  5         131  
6 5     5   22 use base qw( Exporter );
  5         8  
  5         762  
7              
8             our $VERSION = '0.26';
9              
10 5     5   2930 use Redis;
  5         488822  
  5         188  
11 5     5   2759 use List::MoreUtils qw( bsearch );
  5         59355  
  5         36  
12 5     5   4226 use Scalar::Util qw( looks_like_number weaken );
  5         13  
  5         310  
13 5     5   26 use Time::HiRes;
  5         11  
  5         42  
14 5     5   435 use Carp qw( carp croak );
  5         9  
  5         260  
15              
16             BEGIN {
17 5     5   150 our @EXPORT_OK = qw( crc16 hash_slot );
18             }
19              
20             use constant {
21 5         11585 D_REFRESH_INTERVAL => 15,
22             MAX_SLOTS => 16384,
23             EOL => "\r\n",
24 5     5   26 };
  5         9  
25              
26             my @CRC16_TAB = (
27             0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
28             0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
29             0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
30             0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
31             0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
32             0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
33             0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
34             0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
35             0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
36             0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
37             0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
38             0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
39             0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
40             0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
41             0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
42             0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
43             0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
44             0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
45             0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
46             0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
47             0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
48             0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
49             0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
50             0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
51             0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
52             0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
53             0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
54             0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
55             0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
56             0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
57             0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
58             0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
59             );
60              
61             my %PREDEFINED_CMDS = (
62             sort => { readonly => 0, key_pos => 1 },
63             zunionstore => { readonly => 0, key_pos => 1 },
64             zinterstore => { readonly => 0, key_pos => 1 },
65             eval => { readonly => 0, movablekeys => 1, key_pos => 0 },
66             evalsha => { readonly => 0, movablekeys => 1, key_pos => 0 },
67             );
68              
69             $Carp::Internal{ (__PACKAGE__) }++;
70              
71              
72             sub new {
73 12     12 1 6354 my $class = shift;
74 12         35 my %params = @_;
75              
76 12         26 my $self = bless {}, $class;
77              
78 12 100       38 unless ( defined $params{startup_nodes} ) {
79 1         167 croak 'Startup nodes not specified';
80             }
81 11 100       34 unless ( ref( $params{startup_nodes} ) eq 'ARRAY' ) {
82 1         75 croak 'Startup nodes must be specified as array reference';
83             }
84 10 100       14 unless ( @{ $params{startup_nodes} } ) {
  10         29  
85 1         75 croak 'Specified empty list of startup nodes';
86             }
87              
88 9 50       31 if ( $params{fallback} ) {
89 0 0       0 if ( $params{lazy} ) {
90 0         0 carp 'Fallback mode revokes lazy for ' . $params{startup_nodes}->[0];
91             }
92              
93 0         0 my $node = Redis->new(%params, server => $params{startup_nodes}->[0]);
94 0 0       0 eval { $node->cluster_info(); 1 } or return $node;
  0         0  
  0         0  
95             }
96              
97 9         36 $self->{startup_nodes} = $params{startup_nodes};
98 9         18 $self->{allow_slaves} = $params{allow_slaves};
99 9         15 $self->{lazy} = $params{lazy};
100 9         34 $self->refresh_interval( $params{refresh_interval} );
101              
102 7         13 $self->{on_node_connect} = $params{on_node_connect};
103 7         12 $self->{on_node_error} = $params{on_node_error};
104              
105 7         10 my %node_params;
106 7         17 foreach my $name ( qw( conservative_reconnect cnx_timeout read_timeout
107             write_timeout password username name debug ) )
108             {
109 56 100       100 next unless defined $params{$name};
110 6         11 $node_params{$name} = $params{$name};
111             }
112 7         14 $self->{_node_params} = \%node_params;
113              
114 7         15 $self->{_nodes_pool} = undef;
115 7         16 $self->{_nodes} = undef;
116 7         12 $self->{_master_nodes} = undef;
117 7         12 $self->{_slots} = undef;
118 7         13 $self->{_commands} = undef;
119 7         12 $self->{_refresh_timestamp} = undef;
120              
121 7 100       17 unless ( $self->{lazy} ) {
122 5         15 $self->_init;
123             }
124              
125 7         34 return $self;
126             }
127              
128             sub run_command {
129 1     1 1 444 my $self = shift;
130 1         1 my $cmd_name = shift;
131              
132 1         6 return $self->_route( $cmd_name, [ @_ ] );
133             }
134              
135             sub nodes {
136 4     4 1 15580 my $self = shift;
137 4         8 my $key = shift;
138 4         5 my $allow_slaves = shift;
139              
140 4 50       13 unless ( defined $self->{_slots} ) {
141 0         0 $self->_init;
142             }
143              
144 4         5 my $slot;
145 4 100       10 if ( defined $key ) {
146 2         6 $slot = hash_slot($key);
147             }
148              
149 4         13 my $nodes = $self->_nodes( $slot, $allow_slaves );
150              
151             return wantarray
152 4         15 ? @{ $self->{_nodes_pool} }{ @{$nodes} }
  4         6  
153 4 50       9 : $self->{_nodes_pool}{ $nodes->[0] };
154             }
155              
156             sub refresh_interval {
157 16     16 1 1193 my $self = shift;
158              
159 16 100       38 if (@_) {
160 13         18 my $seconds = shift;
161              
162 13 100       31 if ( defined $seconds ) {
163 10 100 100     59 if ( !looks_like_number($seconds) || $seconds < 0 ) {
164 4         298 croak qq{"refresh_interval" must be a positive number};
165             }
166 6         13 $self->{refresh_interval} = $seconds;
167             }
168             else {
169 3         7 $self->{refresh_interval} = D_REFRESH_INTERVAL;
170             }
171             }
172              
173 12         29 return $self->{refresh_interval};
174             }
175              
176             sub crc16 {
177 11     11 1 95 my $data = shift;
178              
179 11 50       37 unless ( utf8::downgrade( $data, 1 ) ) {
180 0         0 utf8::encode($data);
181             }
182              
183 11         17 my $crc = 0;
184 11         31 foreach my $char ( split //, $data ) {
185 39         74 $crc = ( $crc << 8 & 0xff00 )
186             ^ $CRC16_TAB[ ( ( $crc >> 8 ) ^ ord($char) ) & 0x00ff ];
187             }
188              
189 11         32 return $crc;
190             }
191              
192             sub hash_slot {
193 10     10 1 680 my $key = shift;
194              
195 10         12 my $hashtag = $key;
196              
197 10 100       37 if ( $key =~ m/\{([^}]*?)\}/ ) {
198 1 50       5 if ( length $1 > 0 ) {
199 1         3 $hashtag = $1;
200             }
201             }
202              
203 10         20 return crc16($hashtag) % MAX_SLOTS;
204             }
205              
206             sub _init {
207 7     7   29 my $self = shift;
208              
209 7         20 $self->_discover_cluster;
210              
211 7 100       21 if ( $self->{refresh_interval} > 0 ) {
212 6         33 $self->{_refresh_timestamp} = [Time::HiRes::gettimeofday];
213             }
214              
215 7         13 return;
216             }
217              
218             sub _discover_cluster {
219 7     7   13 my $self = shift;
220              
221 7         9 my $nodes;
222              
223 7 100       16 if ( defined $self->{_slots} ) {
224 1         5 $nodes = $self->_nodes( undef, $self->{allow_slaves} );
225             }
226             else {
227 6         10 my %nodes_pool;
228              
229 6         8 foreach my $hostport ( @{ $self->{startup_nodes} } ) {
  6         14  
230 18 50       84 unless ( defined $nodes_pool{$hostport} ) {
231 18         38 $nodes_pool{$hostport} = $self->_new_node($hostport);
232             }
233             }
234              
235 6         33 $self->{_nodes_pool} = \%nodes_pool;
236 6         17 $nodes = [ keys %nodes_pool ];
237             }
238              
239 7         30 $self->_run_command( 'cluster_state', [], $nodes );
240 7         18 my $slots = $self->_run_command( 'cluster_slots', [], $nodes );
241              
242 7 50       13 unless ( @{$slots} ) {
  7         20  
243 0         0 croak 'ERR Returned empty list of slots';
244             }
245              
246 7         23 $self->_prepare_nodes($slots);
247              
248 7 100       23 unless ( defined $self->{_commands} ) {
249 6         34 $self->_load_commands;
250             }
251              
252 7         32 return;
253             }
254              
255             sub _prepare_nodes {
256 7     7   11 my $self = shift;
257 7         10 my $slots_raw = shift;
258              
259 7         18 my %nodes_pool;
260             my @slots;
261 7         0 my @masters_nodes;
262              
263 7         12 my $nodes_pool_old = $self->{_nodes_pool};
264              
265 7         12 foreach my $range ( @{$slots_raw} ) {
  7         14  
266 28         74 my $range_start = shift @{$range};
  28         43  
267 28         34 my $range_end = shift @{$range};
  28         37  
268              
269 28         34 my @nodes;
270 28         33 my $is_master = 1;
271              
272 28         30 foreach my $node_info ( @{$range} ) {
  28         45  
273 70         128 my $hostport = "$node_info->[0]:$node_info->[1]";
274              
275 70 100       119 unless ( defined $nodes_pool{$hostport} ) {
276 49 100       99 if ( defined $nodes_pool_old->{$hostport} ) {
277 25         51 $nodes_pool{$hostport} = delete $nodes_pool_old->{$hostport};
278             }
279             else {
280 24         43 $nodes_pool{$hostport} = $self->_new_node($hostport);
281             }
282              
283 49 100       172 if ($is_master) {
284 21         35 push( @masters_nodes, $hostport );
285 21         36 $is_master = 0;
286             }
287             }
288              
289 70         115 push( @nodes, $hostport );
290             }
291              
292 28         69 push( @slots, [ $range_start, $range_end, \@nodes ] );
293             }
294              
295 7         30 @slots = sort { $a->[0] <=> $b->[0] } @slots;
  28         63  
296              
297 7         18 $self->{_nodes_pool} = \%nodes_pool;
298 7         27 $self->{_nodes} = [ keys %nodes_pool ];
299 7         33 $self->{_master_nodes} = \@masters_nodes;
300 7         16 $self->{_slots} = \@slots;
301              
302 7         38 return;
303             }
304              
305             sub _load_commands {
306 6     6   13 my $self = shift;
307              
308 6         24 my $nodes = $self->_nodes( undef, $self->{allow_slaves} );
309 6         18 my $commands_raw = $self->_run_command( 'command', [], $nodes );
310              
311 6         37 my %commands = %PREDEFINED_CMDS;
312              
313 6         12 foreach my $cmd_raw ( @{$commands_raw} ) {
  6         12  
314 30         56 my $kwd = lc( $cmd_raw->[0] );
315              
316 30 50       50 next if exists $commands{$kwd};
317              
318 30         35 my $readonly = 0;
319 30         33 foreach my $flag ( @{ $cmd_raw->[2] } ) {
  30         41  
320 48 100       79 if ( $flag eq 'readonly' ) {
321 12         15 $readonly = 1;
322 12         18 last;
323             }
324             }
325              
326 30         92 $commands{$kwd} = {
327             readonly => $readonly,
328             key_pos => $cmd_raw->[3],
329             };
330             }
331              
332 6         14 $self->{_commands} = \%commands;
333              
334 6         20 return;
335             }
336              
337             sub _new_node {
338 43     43   52 my $self = shift;
339 43         52 my $hostport = shift;
340              
341             return Redis->new(
342 43         52 %{ $self->{_node_params} },
  43         118  
343             server => $hostport,
344             reconnect => 0.001, # reconnect only once
345             every => 1000,
346             no_auto_connect_on_new => 1,
347              
348             on_connect => $self->_create_on_node_connect($hostport),
349             );
350             }
351              
352             sub _create_on_node_connect {
353 43     43   54 my $self = shift;
354 43         48 my $hostport = shift;
355              
356 43         103 weaken($self);
357              
358             return sub {
359 43     43   13270 my $redis = shift;
360              
361 43 100       91 if ( $self->{allow_slaves} ) {
362 14         48 $redis->readonly;
363             }
364              
365 43 50       2412 if ( defined $self->{on_node_connect} ) {
366 0         0 $self->{on_node_connect}->($hostport);
367             }
368 43         207 };
369             }
370              
371             sub _route {
372 10     10   14 my $self = shift;
373 10         14 my $cmd_name = shift;
374 10         13 my $args = shift;
375              
376 10 100 33     82 if ( !defined $self->{_slots} || (
      66        
377             $self->{refresh_interval} > 0
378             && Time::HiRes::tv_interval( $self->{_refresh_timestamp} )
379             > $self->{refresh_interval} ) )
380             {
381 1         3 $self->_init;
382             }
383              
384 10         147 my $key;
385 10         29 my @kwds = split( m/_/, lc($cmd_name) );
386 10         29 my $cmd_info = $self->{_commands}{ $kwds[0] };
387              
388 10 100       20 if ( defined $cmd_info ) {
389 8 100 33     21 if ( $cmd_info->{key_pos} > 0 ) {
    50          
390 7         15 $key = $args->[ $cmd_info->{key_pos} - scalar @kwds ];
391             }
392             # Exception for EVAL and EVALSHA commands
393             elsif ( $cmd_info->{movablekeys}
394             && $args->[1] > 0 )
395             {
396 0         0 $key = $args->[2];
397             }
398             }
399              
400 10         13 my $slot;
401 10         17 my $allow_slaves = $self->{allow_slaves};
402              
403 10 100       18 if ( defined $key ) {
404 7         15 $slot = hash_slot($key);
405 7   100     22 $allow_slaves &&= $cmd_info->{readonly};
406             }
407              
408 10         20 my $nodes = $self->_nodes( $slot, $allow_slaves );
409              
410 10 50       21 unless ( defined $nodes ) {
411 0         0 croak 'ERR Target node not found. Maybe not all slots are served';
412             }
413              
414 10         32 return $self->_run_command( $cmd_name, $args, $nodes );
415             }
416              
417             sub _run_command {
418 31     31   44 my $self = shift;
419 31         44 my $cmd_name = shift;
420 31         38 my $args = shift;
421 31         39 my $nodes = shift;
422              
423 31         39 my $nodes_pool = $self->{_nodes_pool};
424              
425 31         35 my $nodes_num = scalar @{$nodes};
  31         45  
426 31         70 my $node_index = int( rand($nodes_num) );
427 31         42 my $fails_cnt = 0;
428 31         45 my $wantarray = wantarray;
429              
430 31 100       63 my $cmd_method
431             = $cmd_name eq 'cluster_state'
432             ? 'cluster_info'
433             : $cmd_name;
434              
435 31         39 while (1) {
436 32         45 my $hostport = $nodes->[$node_index];
437 32         44 my $node = $nodes_pool->{$hostport};
438              
439 32         58 my $reply;
440             my @arr_reply;
441 32         0 my $err_msg;
442              
443             {
444 32         36 local $@;
  32         40  
445              
446 32         53 eval {
447 32 100       61 if ( $cmd_name eq 'cluster_state' ) {
    100          
448 7         11 undef $wantarray;
449 7         10 my $reply_raw = $node->$cmd_method( @{$args} );
  7         35  
450 7         2245 $reply = _parse_info($reply_raw);
451              
452 7 50       22 if ( $reply->{cluster_state} eq 'ok' ) {
453 7         19 $reply = 1;
454             }
455             else {
456 0         0 croak 'CLUSTERDOWN The cluster is down';
457             }
458             }
459             elsif ( $wantarray ) {
460 1         2 @arr_reply = $node->$cmd_method( @{$args} );
  1         5  
461             }
462             else {
463 24         30 $reply = $node->$cmd_method( @{$args} );
  24         123  
464             }
465             };
466              
467 32 100       3689 if ($@) {
468 3         5 $err_msg = $@;
469             }
470             }
471              
472 32 100       63 if ($err_msg) {
473 3         5 my $err_code = 'ERR';
474 3 50       15 if ( $err_msg =~ m/^(?:\[\w+\]\s+)?([A-Z]{3,})/ ) {
475 3         7 $err_code = $1;
476             }
477              
478 3 100 66     14 if ( $err_code eq 'MOVED' || $err_code eq 'ASK' ) {
479 1 50       3 if ( $err_code eq 'MOVED' ) {
480 1         4 $self->_init;
481             }
482              
483 1         6 my ($fwd_hostport) = ( split( m/\s+/, $err_msg ) )[3];
484 1         4 $fwd_hostport =~ s/,$//;
485              
486 1 50       3 unless ( defined $nodes_pool->{$fwd_hostport} ) {
487 1         3 $nodes_pool->{$fwd_hostport} = $self->_new_node( $fwd_hostport );
488             }
489              
490 1         7 return $self->_run_command( $cmd_name, $args, [ $fwd_hostport ] );
491             }
492              
493 2 50       5 if ( defined $self->{on_node_error} ) {
494 0         0 $self->{on_node_error}->( $err_msg, $hostport );
495             }
496              
497 2 100       4 if ( ++$fails_cnt < $nodes_num ) {
498 1 50       3 if ( ++$node_index == $nodes_num ) {
499 0         0 $node_index = 0;
500             }
501              
502 1         2 next;
503             }
504              
505 1         7 die $err_msg;
506             }
507              
508 29 100       102 return $wantarray ? @arr_reply : $reply;
509             }
510             }
511              
512             sub _nodes {
513 21     21   28 my $self = shift;
514 21         27 my $slot = shift;
515 21         22 my $allow_slaves = shift;
516              
517 21 100       41 if ( defined $slot ) {
518             my ($range) = bsearch {
519 18 50   18   50 $slot > $_->[1] ? -1 : $slot < $_->[0] ? 1 : 0;
    100          
520             }
521 9         33 @{ $self->{_slots} };
  9         45  
522              
523 9 50       33 return unless defined $range;
524              
525 9 100       28 return $allow_slaves
526             ? $range->[2]
527             : [ $range->[2][0] ];
528             }
529              
530             return $allow_slaves
531             ? $self->{_nodes}
532 12 100       34 : $self->{_master_nodes};
533             }
534              
535             sub _parse_info {
536 7         33 return { map { split( m/:/, $_, 2 ) }
537 7     7   26 grep { m/^[^#]/ } split( EOL, $_[0] ) };
  7         33  
538             }
539              
540             sub AUTOLOAD {
541 5     5   2475 our $AUTOLOAD;
542 5         9 my $cmd_name = $AUTOLOAD;
543 5         28 $cmd_name =~ s/^.+:://;
544              
545             my $sub = sub {
546 9     9   627 my $self = shift;
547 9         28 return $self->_route( $cmd_name, [@_] );
548 5         19 };
549              
550 5         7 do {
551 5     5   43 no strict 'refs';
  5         9  
  5         443  
552 5         23 *{$cmd_name} = $sub;
  5         16  
553             };
554              
555 5         7 goto &{$sub};
  5         14  
556             }
557              
558       0     sub DESTROY { }
559              
560             1;
561             __END__