File Coverage

blib/lib/Redis/ClusterRider.pm
Criterion Covered Total %
statement 276 285 96.8
branch 86 104 82.6
condition 12 18 66.6
subroutine 32 33 96.9
pod 6 6 100.0
total 412 446 92.3


line stmt bran cond sub pod time code
1             package Redis::ClusterRider;
2              
3 5     5   316255 use 5.008000;
  5         32  
4 5     5   22 use strict;
  5         8  
  5         105  
5 5     5   21 use warnings;
  5         9  
  5         129  
6 5     5   21 use base qw( Exporter );
  5         9  
  5         729  
7              
8             our $VERSION = '0.22';
9              
10 5     5   2847 use Redis;
  5         207816  
  5         160  
11 5     5   2530 use List::MoreUtils qw( bsearch );
  5         57641  
  5         25  
12 5     5   4498 use Scalar::Util qw( looks_like_number weaken );
  5         12  
  5         263  
13 5     5   23 use Time::HiRes;
  5         10  
  5         30  
14 5     5   422 use Carp qw( croak );
  5         10  
  5         247  
15              
16             BEGIN {
17 5     5   185 our @EXPORT_OK = qw( crc16 hash_slot );
18             }
19              
20             use constant {
21 5         11822 D_REFRESH_INTERVAL => 15,
22             MAX_SLOTS => 16384,
23             EOL => "\r\n",
24 5     5   26 };
  5         10  
25              
26             my @CRC16_TAB = (
27             0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
28             0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
29             0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
30             0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
31             0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
32             0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
33             0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
34             0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
35             0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
36             0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
37             0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
38             0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
39             0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
40             0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
41             0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
42             0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
43             0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
44             0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
45             0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
46             0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
47             0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
48             0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
49             0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
50             0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
51             0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
52             0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
53             0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
54             0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
55             0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
56             0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
57             0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
58             0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
59             );
60              
61             my %PREDEFINED_CMDS = (
62             sort => { readonly => 0, key_pos => 1 },
63             zunionstore => { readonly => 0, key_pos => 1 },
64             zinterstore => { readonly => 0, key_pos => 1 },
65             eval => { readonly => 0, movablekeys => 1, key_pos => 0 },
66             evalsha => { readonly => 0, movablekeys => 1, key_pos => 0 },
67             );
68              
69             $Carp::Internal{ (__PACKAGE__) }++;
70              
71              
72             sub new {
73 12     12 1 6082 my $class = shift;
74 12         32 my %params = @_;
75              
76 12         25 my $self = bless {}, $class;
77              
78 12 100       35 unless ( defined $params{startup_nodes} ) {
79 1         161 croak 'Startup nodes not specified';
80             }
81 11 100       29 unless ( ref( $params{startup_nodes} ) eq 'ARRAY' ) {
82 1         76 croak 'Startup nodes must be specified as array reference';
83             }
84 10 100       14 unless ( @{ $params{startup_nodes} } ) {
  10         24  
85 1         71 croak 'Specified empty list of startup nodes';
86             }
87              
88 9         44 $self->{startup_nodes} = $params{startup_nodes};
89 9         18 $self->{allow_slaves} = $params{allow_slaves};
90 9         14 $self->{lazy} = $params{lazy};
91 9         30 $self->refresh_interval( $params{refresh_interval} );
92              
93 7         12 $self->{on_node_connect} = $params{on_node_connect};
94 7         12 $self->{on_node_error} = $params{on_node_error};
95              
96 7         10 my %node_params;
97 7         12 foreach my $name ( qw( conservative_reconnect cnx_timeout read_timeout
98             write_timeout password name debug ) )
99             {
100 49 100       92 next unless defined $params{$name};
101 6         9 $node_params{$name} = $params{$name};
102             }
103 7         13 $self->{_node_params} = \%node_params;
104              
105 7         18 $self->{_nodes_pool} = undef;
106 7         9 $self->{_nodes} = undef;
107 7         11 $self->{_master_nodes} = undef;
108 7         13 $self->{_slots} = undef;
109 7         10 $self->{_commands} = undef;
110 7         11 $self->{_refresh_timestamp} = undef;
111              
112 7 100       14 unless ( $self->{lazy} ) {
113 5         11 $self->_init;
114             }
115              
116 7         27 return $self;
117             }
118              
119             sub run_command {
120 1     1 1 568 my $self = shift;
121 1         2 my $cmd_name = shift;
122              
123 1         4 return $self->_route( $cmd_name, [ @_ ] );
124             }
125              
126             sub nodes {
127 4     4 1 1613 my $self = shift;
128 4         6 my $key = shift;
129 4         4 my $allow_slaves = shift;
130              
131 4 50       10 unless ( defined $self->{_slots} ) {
132 0         0 $self->_init;
133             }
134              
135 4         4 my $slot;
136 4 100       10 if ( defined $key ) {
137 2         6 $slot = hash_slot($key);
138             }
139              
140 4         9 my $nodes = $self->_nodes( $slot, $allow_slaves );
141              
142             return wantarray
143 4         11 ? @{ $self->{_nodes_pool} }{ @{$nodes} }
  4         6  
144 4 50       8 : $self->{_nodes_pool}{ $nodes->[0] };
145             }
146              
147             sub refresh_interval {
148 16     16 1 1119 my $self = shift;
149              
150 16 100       36 if (@_) {
151 13         16 my $seconds = shift;
152              
153 13 100       30 if ( defined $seconds ) {
154 10 100 100     53 if ( !looks_like_number($seconds) || $seconds < 0 ) {
155 4         302 croak qq{"refresh_interval" must be a positive number};
156             }
157 6         12 $self->{refresh_interval} = $seconds;
158             }
159             else {
160 3         6 $self->{refresh_interval} = D_REFRESH_INTERVAL;
161             }
162             }
163              
164 12         23 return $self->{refresh_interval};
165             }
166              
167             sub crc16 {
168 11     11 1 77 my $data = shift;
169              
170 11 50       30 unless ( utf8::downgrade( $data, 1 ) ) {
171 0         0 utf8::encode($data);
172             }
173              
174 11         15 my $crc = 0;
175 11         31 foreach my $char ( split //, $data ) {
176 39         72 $crc = ( $crc << 8 & 0xff00 )
177             ^ $CRC16_TAB[ ( ( $crc >> 8 ) ^ ord($char) ) & 0x00ff ];
178             }
179              
180 11         26 return $crc;
181             }
182              
183             sub hash_slot {
184 10     10 1 616 my $key = shift;
185              
186 10         11 my $hashtag = $key;
187              
188 10 100       30 if ( $key =~ m/\{([^}]*?)\}/ ) {
189 1 50       4 if ( length $1 > 0 ) {
190 1         2 $hashtag = $1;
191             }
192             }
193              
194 10         18 return crc16($hashtag) % MAX_SLOTS;
195             }
196              
197             sub _init {
198 7     7   15 my $self = shift;
199              
200 7         15 $self->_discover_cluster;
201              
202 7 100       26 if ( $self->{refresh_interval} > 0 ) {
203 6         25 $self->{_refresh_timestamp} = [Time::HiRes::gettimeofday];
204             }
205              
206 7         13 return;
207             }
208              
209             sub _discover_cluster {
210 7     7   10 my $self = shift;
211              
212 7         10 my $nodes;
213              
214 7 100       16 if ( defined $self->{_slots} ) {
215 1         3 $nodes = $self->_nodes( undef, $self->{allow_slaves} );
216             }
217             else {
218 6         7 my %nodes_pool;
219              
220 6         9 foreach my $hostport ( @{ $self->{startup_nodes} } ) {
  6         12  
221 18 50       84 unless ( defined $nodes_pool{$hostport} ) {
222 18         35 $nodes_pool{$hostport} = $self->_new_node($hostport);
223             }
224             }
225              
226 6         34 $self->{_nodes_pool} = \%nodes_pool;
227 6         17 $nodes = [ keys %nodes_pool ];
228             }
229              
230 7         25 $self->_run_command( 'cluster_state', [], $nodes );
231 7         16 my $slots = $self->_run_command( 'cluster_slots', [], $nodes );
232              
233 7 50       12 unless ( @{$slots} ) {
  7         17  
234 0         0 croak 'ERR Returned empty list of slots';
235             }
236              
237 7         20 $self->_prepare_nodes($slots);
238              
239 7 100       33 unless ( defined $self->{_commands} ) {
240 6         19 $self->_load_commands;
241             }
242              
243 7         26 return;
244             }
245              
246             sub _prepare_nodes {
247 7     7   9 my $self = shift;
248 7         11 my $slots_raw = shift;
249              
250 7         14 my %nodes_pool;
251             my @slots;
252 7         0 my @masters_nodes;
253              
254 7         12 my $nodes_pool_old = $self->{_nodes_pool};
255              
256 7         38 foreach my $range ( @{$slots_raw} ) {
  7         17  
257 28         35 my $range_start = shift @{$range};
  28         39  
258 28         34 my $range_end = shift @{$range};
  28         32  
259              
260 28         38 my @nodes;
261 28         31 my $is_master = 1;
262              
263 28         30 foreach my $node_info ( @{$range} ) {
  28         41  
264 70         126 my $hostport = "$node_info->[0]:$node_info->[1]";
265              
266 70 100       118 unless ( defined $nodes_pool{$hostport} ) {
267 49 100       84 if ( defined $nodes_pool_old->{$hostport} ) {
268 25         46 $nodes_pool{$hostport} = delete $nodes_pool_old->{$hostport};
269             }
270             else {
271 24         41 $nodes_pool{$hostport} = $self->_new_node($hostport);
272             }
273              
274 49 100       159 if ($is_master) {
275 21         29 push( @masters_nodes, $hostport );
276 21         27 $is_master = 0;
277             }
278             }
279              
280 70         115 push( @nodes, $hostport );
281             }
282              
283 28         73 push( @slots, [ $range_start, $range_end, \@nodes ] );
284             }
285              
286 7         44 @slots = sort { $a->[0] <=> $b->[0] } @slots;
  28         60  
287              
288 7         30 $self->{_nodes_pool} = \%nodes_pool;
289 7         35 $self->{_nodes} = [ keys %nodes_pool ];
290 7         33 $self->{_master_nodes} = \@masters_nodes;
291 7         26 $self->{_slots} = \@slots;
292              
293 7         19 return;
294             }
295              
296             sub _load_commands {
297 6     6   22 my $self = shift;
298              
299 6         17 my $nodes = $self->_nodes( undef, $self->{allow_slaves} );
300 6         17 my $commands_raw = $self->_run_command( 'command', [], $nodes );
301              
302 6         31 my %commands = %PREDEFINED_CMDS;
303              
304 6         11 foreach my $cmd_raw ( @{$commands_raw} ) {
  6         12  
305 30         49 my $kwd = lc( $cmd_raw->[0] );
306              
307 30 50       52 next if exists $commands{$kwd};
308              
309 30         33 my $readonly = 0;
310 30         31 foreach my $flag ( @{ $cmd_raw->[2] } ) {
  30         42  
311 48 100       75 if ( $flag eq 'readonly' ) {
312 12         14 $readonly = 1;
313 12         23 last;
314             }
315             }
316              
317 30         79 $commands{$kwd} = {
318             readonly => $readonly,
319             key_pos => $cmd_raw->[3],
320             };
321             }
322              
323 6         15 $self->{_commands} = \%commands;
324              
325 6         21 return;
326             }
327              
328             sub _new_node {
329 43     43   57 my $self = shift;
330 43         59 my $hostport = shift;
331              
332             return Redis->new(
333 43         50 %{ $self->{_node_params} },
  43         113  
334             server => $hostport,
335             reconnect => 0.001, # reconnect only once
336             every => 1000,
337             no_auto_connect_on_new => 1,
338              
339             on_connect => $self->_create_on_node_connect($hostport),
340             );
341             }
342              
343             sub _create_on_node_connect {
344 43     43   55 my $self = shift;
345 43         52 my $hostport = shift;
346              
347 43         103 weaken($self);
348              
349             return sub {
350 43     43   13649 my $redis = shift;
351              
352 43 100       97 if ( $self->{allow_slaves} ) {
353 14         68 $redis->readonly;
354             }
355              
356 43 50       2807 if ( defined $self->{on_node_connect} ) {
357 0         0 $self->{on_node_connect}->($hostport);
358             }
359 43         189 };
360             }
361              
362             sub _route {
363 10     10   15 my $self = shift;
364 10         13 my $cmd_name = shift;
365 10         10 my $args = shift;
366              
367 10 100 33     69 if ( !defined $self->{_slots} || (
      66        
368             $self->{refresh_interval} > 0
369             && Time::HiRes::tv_interval( $self->{_refresh_timestamp} )
370             > $self->{refresh_interval} ) )
371             {
372 1         2 $self->_init;
373             }
374              
375 10         132 my $key;
376 10         28 my @kwds = split( m/_/, lc($cmd_name) );
377 10         19 my $cmd_info = $self->{_commands}{ $kwds[0] };
378              
379 10 100       17 if ( defined $cmd_info ) {
380 8 100 33     18 if ( $cmd_info->{key_pos} > 0 ) {
    50          
381 7         12 $key = $args->[ $cmd_info->{key_pos} - scalar @kwds ];
382             }
383             # Exception for EVAL and EVALSHA commands
384             elsif ( $cmd_info->{movablekeys}
385             && $args->[1] > 0 )
386             {
387 0         0 $key = $args->[2];
388             }
389             }
390              
391 10         13 my $slot;
392 10         14 my $allow_slaves = $self->{allow_slaves};
393              
394 10 100       14 if ( defined $key ) {
395 7         14 $slot = hash_slot($key);
396 7   100     27 $allow_slaves &&= $cmd_info->{readonly};
397             }
398              
399 10         21 my $nodes = $self->_nodes( $slot, $allow_slaves );
400              
401 10 50       18 unless ( defined $nodes ) {
402 0         0 croak 'ERR Target node not found. Maybe not all slots are served';
403             }
404              
405 10         23 return $self->_run_command( $cmd_name, $args, $nodes );
406             }
407              
408             sub _run_command {
409 31     31   39 my $self = shift;
410 31         40 my $cmd_name = shift;
411 31         37 my $args = shift;
412 31         40 my $nodes = shift;
413              
414 31         178 my $nodes_pool = $self->{_nodes_pool};
415              
416 31         34 my $nodes_num = scalar @{$nodes};
  31         41  
417 31         157 my $node_index = int( rand($nodes_num) );
418 31         39 my $fails_cnt = 0;
419 31         46 my $wantarray = wantarray;
420              
421 31 100       59 my $cmd_method
422             = $cmd_name eq 'cluster_state'
423             ? 'cluster_info'
424             : $cmd_name;
425              
426 31         37 while (1) {
427 32         55 my $hostport = $nodes->[$node_index];
428 32         41 my $node = $nodes_pool->{$hostport};
429              
430 32         62 my $reply;
431             my @arr_reply;
432 32         0 my $err_msg;
433              
434             {
435 32         35 local $@;
  32         39  
436              
437 32         44 eval {
438 32 100       58 if ( $cmd_name eq 'cluster_state' ) {
    100          
439 7         11 undef $wantarray;
440 7         10 my $reply_raw = $node->$cmd_method( @{$args} );
  7         42  
441 7         1371 $reply = _parse_info($reply_raw);
442              
443 7 50       27 if ( $reply->{cluster_state} eq 'ok' ) {
444 7         16 $reply = 1;
445             }
446             else {
447 0         0 croak 'CLUSTERDOWN The cluster is down';
448             }
449             }
450             elsif ( $wantarray ) {
451 1         1 @arr_reply = $node->$cmd_method( @{$args} );
  1         6  
452             }
453             else {
454 24         28 $reply = $node->$cmd_method( @{$args} );
  24         125  
455             }
456             };
457              
458 32 100       3561 if ($@) {
459 3         7 $err_msg = $@;
460             }
461             }
462              
463 32 100       55 if ($err_msg) {
464 3         4 my $err_code = 'ERR';
465 3 50       13 if ( $err_msg =~ m/^(?:\[\w+\]\s+)?([A-Z]{3,})/ ) {
466 3         14 $err_code = $1;
467             }
468              
469 3 100 66     11 if ( $err_code eq 'MOVED' || $err_code eq 'ASK' ) {
470 1 50       3 if ( $err_code eq 'MOVED' ) {
471 1         3 $self->_init;
472             }
473              
474 1         7 my ($fwd_hostport) = ( split( m/\s+/, $err_msg ) )[3];
475 1         3 $fwd_hostport =~ s/,$//;
476              
477 1 50       12 unless ( defined $nodes_pool->{$fwd_hostport} ) {
478 1         4 $nodes_pool->{$fwd_hostport} = $self->_new_node( $fwd_hostport );
479             }
480              
481 1         7 return $self->_run_command( $cmd_name, $args, [ $fwd_hostport ] );
482             }
483              
484 2 50       5 if ( defined $self->{on_node_error} ) {
485 0         0 $self->{on_node_error}->( $err_msg, $hostport );
486             }
487              
488 2 100       5 if ( ++$fails_cnt < $nodes_num ) {
489 1 50       2 if ( ++$node_index == $nodes_num ) {
490 0         0 $node_index = 0;
491             }
492              
493 1         2 next;
494             }
495              
496 1         5 die $err_msg;
497             }
498              
499 29 100       96 return $wantarray ? @arr_reply : $reply;
500             }
501             }
502              
503             sub _nodes {
504 21     21   30 my $self = shift;
505 21         30 my $slot = shift;
506 21         28 my $allow_slaves = shift;
507              
508 21 100       39 if ( defined $slot ) {
509             my ($range) = bsearch {
510 18 50   18   43 $slot > $_->[1] ? -1 : $slot < $_->[0] ? 1 : 0;
    100          
511             }
512 9         32 @{ $self->{_slots} };
  9         30  
513              
514 9 50       32 return unless defined $range;
515              
516 9 100       23 return $allow_slaves
517             ? $range->[2]
518             : [ $range->[2][0] ];
519             }
520              
521             return $allow_slaves
522             ? $self->{_nodes}
523 12 100       32 : $self->{_master_nodes};
524             }
525              
526             sub _parse_info {
527 7         29 return { map { split( m/:/, $_, 2 ) }
528 7     7   25 grep { m/^[^#]/ } split( EOL, $_[0] ) };
  7         31  
529             }
530              
531             sub AUTOLOAD {
532 5     5   2489 our $AUTOLOAD;
533 5         7 my $cmd_name = $AUTOLOAD;
534 5         28 $cmd_name =~ s/^.+:://;
535              
536             my $sub = sub {
537 9     9   624 my $self = shift;
538 9         23 return $self->_route( $cmd_name, [@_] );
539 5         20 };
540              
541 5         8 do {
542 5     5   37 no strict 'refs';
  5         9  
  5         419  
543 5         5 *{$cmd_name} = $sub;
  5         15  
544             };
545              
546 5         7 goto &{$sub};
  5         11  
547             }
548              
549       0     sub DESTROY { }
550              
551             1;
552             __END__