File Coverage

blib/lib/Redis.pm
Criterion Covered Total %
statement 227 514 44.1
branch 55 260 21.1
condition 34 143 23.7
subroutine 43 84 51.1
pod 16 16 100.0
total 375 1017 36.8


line stmt bran cond sub pod time code
1             #
2             # This file is part of Redis
3             #
4             # This software is Copyright (c) 2015 by Pedro Melo, Damien Krotkine.
5             #
6             # This is free software, licensed under:
7             #
8             # The Artistic License 2.0 (GPL Compatible)
9             #
10             package Redis;
11             $Redis::VERSION = '2.000';
12             # ABSTRACT: Perl binding for Redis database
13             # VERSION
14             # AUTHORITY
15              
16 15     15   984885 use warnings;
  15         170  
  15         492  
17 15     15   81 use strict;
  15         48  
  15         316  
18              
19 15     15   7844 use IO::Socket::INET;
  15         332840  
  15         94  
20 15     15   6376 use IO::Socket::UNIX;
  15         37  
  15         90  
21 15     15   19146 use IO::Socket::Timeout;
  15         93498  
  15         135  
22 15     15   7361 use IO::Select;
  15         25836  
  15         750  
23 15     15   112 use IO::Handle;
  15         39  
  15         582  
24 15     15   105 use Fcntl qw( O_NONBLOCK F_SETFL );
  15         30  
  15         696  
25 15     15   75 use Errno ();
  15         33  
  15         220  
26 15     15   9880 use Data::Dumper;
  15         109636  
  15         929  
27 15     15   139 use Carp;
  15         49  
  15         729  
28 15     15   3116 use Try::Tiny;
  15         12615  
  15         732  
29 15     15   109 use Scalar::Util ();
  15         26  
  15         259  
30              
31 15     15   6701 use Redis::Sentinel;
  15         39  
  15         628  
32              
33 15     15   122 use constant SSL_AVAILABLE => eval { require IO::Socket::SSL };
  15         28  
  15         29  
  15         12489  
34              
35 15     15   973749 use constant WIN32 => $^O =~ /mswin32/i;
  15         31  
  15         1282  
36 15   50 15   106 use constant EWOULDBLOCK => eval {Errno::EWOULDBLOCK} || -1E9;
  15         40  
  15         167  
37 15   50 15   91 use constant EAGAIN => eval {Errno::EAGAIN} || -1E9;
  15         60  
  15         67  
38 15   50 15   98 use constant EINTR => eval {Errno::EINTR} || -1E9;
  15         31  
  15         27  
39 15   50 15   105 use constant ECONNRESET => eval {Errno::ECONNRESET} || -1E9;
  15         36  
  15         41  
40              
41             # According to IO::Socket::SSL documentation, 16k is the maximum
42             # size of an SSL frame and because sysread returns data from only
43             # a single SSL frame you guarantee this way, that there is no pending
44             # data.
45 15     15   87 use constant BUFSIZE => 16_384;
  15         28  
  15         22317  
46              
47             sub _maybe_enable_timeouts {
48 2     2   45678 my ($self, $socket) = @_;
49 2 50       14 $socket or return;
50             defined $self->{read_timeout} || defined $self->{write_timeout}
51 2 50 33     12 or return $socket;
52 2         120 IO::Socket::Timeout->enable_timeouts_on($socket);
53             defined $self->{read_timeout}
54 2 50       1446 and $socket->read_timeout($self->{read_timeout});
55             defined $self->{write_timeout}
56 2 50       286 and $socket->write_timeout($self->{write_timeout});
57 2         40 $socket;
58             }
59              
60             sub new {
61 2     2 1 37971 my ($class, %args) = @_;
62 2         31 my $self = bless {}, $class;
63              
64 2         68 $self->{__buf} = '';
65 2   33     80 $self->{debug} = $args{debug} || $ENV{REDIS_DEBUG};
66              
67             ## Deal with REDIS_SERVER ENV
68 2 0 33     88 if ($ENV{REDIS_SERVER} && ! defined $args{sock} && ! defined $args{server} && ! defined $args{sentinels}) {
      33        
      0        
69 0 0       0 if ($ENV{REDIS_SERVER} =~ m!^/!) {
    0          
    0          
70 0         0 $args{sock} = $ENV{REDIS_SERVER};
71             }
72             elsif ($ENV{REDIS_SERVER} =~ m!^unix:(.+)!) {
73 0         0 $args{sock} = $1;
74             }
75             elsif ($ENV{REDIS_SERVER} =~ m!^(?:tcp:)?(.+)!) {
76 0         0 $args{server} = $1;
77             }
78             }
79              
80             defined $args{$_}
81 2   66     62 and $self->{$_} = $args{$_} for
82             qw(username password on_connect name no_auto_connect_on_new cnx_timeout
83             write_timeout read_timeout sentinels_ssl sentinels_username sentinels_password
84             sentinels_cnx_timeout sentinels_write_timeout sentinels_read_timeout no_sentinels_list_update);
85              
86 2   50     33 $self->{reconnect} = $args{reconnect} || 0;
87 2   50     42 $self->{conservative_reconnect} = $args{conservative_reconnect} || 0;
88 2   50     55 $self->{every} = $args{every} || 1000;
89              
90 2 50       27 if (defined $args{sock}) {
    50          
91 0         0 $self->{server} = $args{sock};
92             $self->{builder} = sub {
93 0     0   0 my ($self) = @_;
94             $self->_maybe_enable_timeouts(
95             IO::Socket::UNIX->new(
96             Peer => $self->{server},
97 0 0       0 ( $self->{cnx_timeout} ? ( Timeout => $self->{cnx_timeout} ): () ),
98             )
99             );
100 0         0 };
101             } elsif ($args{sentinels}) {
102 0         0 $self->{sentinels} = $args{sentinels};
103              
104 0 0       0 ref $self->{sentinels} eq 'ARRAY'
105             or croak("'sentinels' param must be an ArrayRef");
106              
107             defined($self->{service} = $args{service})
108 0 0       0 or croak("Need 'service' name when using 'sentinels'!");
109              
110             $self->{builder} = sub {
111 0     0   0 my ($self) = @_;
112             # try to connect to a sentinel
113 0         0 my $status;
114 0         0 foreach my $sentinel_address (@{$self->{sentinels}}) {
  0         0  
115 0 0       0 my $sentinel = eval {
116             Redis::Sentinel->new(
117             server => $sentinel_address,
118             username => $self->{sentinels_username},
119             password => $self->{sentinels_password},
120             cnx_timeout => ( defined $self->{sentinels_cnx_timeout}
121             ? $self->{sentinels_cnx_timeout} : 0.1 ),
122             read_timeout => ( defined $self->{sentinels_read_timeout}
123             ? $self->{sentinels_read_timeout} : 1 ),
124             write_timeout => ( defined $self->{sentinels_write_timeout}
125             ? $self->{sentinels_write_timeout} : 1 ),
126             ssl => ( defined $self->{sentinels_ssl}
127 0 0       0 ? $self->{sentinels_ssl} : 0 ),
    0          
    0          
    0          
128             )
129             } or next;
130 0         0 my $server_address = $sentinel->get_service_address($self->{service});
131 0 0 0     0 defined $server_address
132             or $status ||= "Sentinels don't know this service",
133             next;
134 0 0       0 $server_address eq 'IDONTKNOW'
135             and $status = "service is configured in one Sentinel, but was never reached",
136             next;
137              
138             # we found the service, set the server
139 0         0 $self->{server} = $server_address;
140              
141 0 0       0 if (! $self->{no_sentinels_list_update} ) {
142             # move the elected sentinel at the front of the list and add
143             # additional sentinels
144 0         0 my $idx = 2;
145 0         0 my %h = ( ( map { $_ => $idx++ } @{$self->{sentinels}}),
  0         0  
  0         0  
146             $sentinel_address => 1,
147             );
148             $self->{sentinels} = [
149 0         0 ( sort { $h{$a} <=> $h{$b} } keys %h ), # sorted existing sentinels,
150 0         0 grep { ! $h{$_}; } # list of unknown
151 0         0 map { +{ @$_ }->{name}; } # names of
152             $sentinel->sentinel( # sentinels
153             sentinels => $self->{service} # for this service
154             )
155 0         0 ];
156             }
157              
158 0         0 my $socket_class;
159              
160             my %socket_args = (
161             PeerAddr => $server_address,
162 0 0       0 ( $self->{cnx_timeout} ? ( Timeout => $self->{cnx_timeout} ) : () ),
163             );
164              
165 0 0 0     0 if (exists $args{ssl} and $args{ssl}) {
166 0         0 if ( ! SSL_AVAILABLE ) {
167             croak("IO::Socket::SSL is required for connecting to Redis using SSL");
168             }
169              
170 0         0 $self->{ssl} = 1;
171 0         0 $socket_class = 'IO::Socket::SSL';
172 0   0     0 $socket_args{SSL_verify_mode} = $args{SSL_verify_mode} // 1;
173             }
174             else {
175 0         0 $self->{ssl} = 0;
176 0         0 $socket_class = 'IO::Socket::INET';
177             }
178              
179 0         0 return $self->_maybe_enable_timeouts(
180             $socket_class->new(%socket_args)
181             );
182             }
183 0   0     0 croak($status || "failed to connect to any of the sentinels");
184 0         0 };
185             } else {
186 2 50       20 $self->{server} = defined $args{server} ? $args{server} : '127.0.0.1:6379';
187             $self->{builder} = sub {
188 2     2   5 my ($self) = @_;
189              
190 2         2 my $socket_class;
191              
192             my %socket_args = (
193             PeerAddr => $self->{server},
194 2 50       30 ( $self->{cnx_timeout} ? ( Timeout => $self->{cnx_timeout} ) : () ),
195             );
196              
197 2 50 33     20 if (exists $args{ssl} and $args{ssl}) {
198 2         3 if ( ! SSL_AVAILABLE ) {
199             croak("IO::Socket::SSL is required for connecting to Redis using SSL");
200             }
201              
202 2         4 $self->{ssl} = 1;
203 2         18 $socket_class = 'IO::Socket::SSL';
204 2   50     43 $socket_args{SSL_verify_mode} = $args{SSL_verify_mode} // 1;
205             }
206             else {
207 0         0 $self->{ssl} = 0;
208 0         0 $socket_class = 'IO::Socket::INET';
209             }
210              
211 2         84 return $self->_maybe_enable_timeouts(
212             $socket_class->new(%socket_args)
213             );
214 2         124 };
215             }
216              
217 2         15 $self->{is_subscriber} = 0;
218 2         14 $self->{subscribers} = {};
219              
220 2 50       86 $self->connect unless $args{no_auto_connect_on_new};
221              
222 2         11 return $self;
223             }
224              
225 0     0 1 0 sub is_subscriber { $_[0]{is_subscriber} }
226              
227             sub select {
228 0     0 1 0 my $self = shift;
229 0         0 my $database = shift;
230              
231 0 0       0 croak( "Cannot select an undefined redis database" )
232             unless defined $database;
233             # don't want to send multiple select() back and forth
234 0 0 0     0 if (!defined $self->{current_database} or $self->{current_database} ne $database) {
235 0         0 my $ret = $self->__std_cmd('select', $database, @_);
236 0         0 $self->{current_database} = $database;
237 0         0 return $ret;
238             };
239 0         0 return "OK"; # emulate redis response as of 3.0.6 just in case anybody cares
240             }
241              
242             ### we don't want DESTROY to fallback into AUTOLOAD
243       0     sub DESTROY { }
244              
245              
246             ### Deal with common, general case, Redis commands
247             our $AUTOLOAD;
248              
249             sub AUTOLOAD {
250 1     1   3247 my $command = $AUTOLOAD;
251 1         27 $command =~ s/.*://;
252              
253 1     2   23 my $method = sub { shift->__std_cmd($command, @_) };
  2         3149  
254              
255             # Save this method for future calls
256 15     15   163 no strict 'refs';
  15         53  
  15         79065  
257 1         52 *$AUTOLOAD = $method;
258              
259 1         23 goto $method;
260             }
261              
262             sub __std_cmd {
263 2     2   11 my $self = shift;
264 2         13 my $command = shift;
265              
266 2         33 $self->__is_valid_command($command);
267              
268 2 50 33     48 my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
269              
270             # If this is an EXEC command, in pipelined mode, and one of the commands
271             # executed in the transaction yields an error, we must collect all errors
272             # from that command, rather than throwing an exception immediately.
273 2         10 my $uc_command = uc($command);
274 2   33     6 my $collect_errors = $cb && $uc_command eq 'EXEC';
275              
276 2 50 33     46 if ($uc_command eq 'MULTI') {
    50          
    50          
    50          
277 0         0 $self->{__inside_transaction} = 1;
278             } elsif ($uc_command eq 'EXEC' || $uc_command eq 'DISCARD') {
279 0         0 delete $self->{__inside_transaction};
280 0         0 delete $self->{__inside_watch};
281             } elsif ($uc_command eq 'WATCH') {
282 0         0 $self->{__inside_watch} = 1;
283             } elsif ($uc_command eq 'UNWATCH') {
284 0         0 delete $self->{__inside_watch};
285             }
286              
287             ## Fast path, no reconnect;
288             $self->{reconnect}
289 2 50       48 or return $self->__run_cmd($command, $collect_errors, undef, $cb, @_);
290              
291 0         0 my @cmd_args = @_;
292             $self->__with_reconnect(
293             sub {
294 0     0   0 $self->__run_cmd($command, $collect_errors, undef, $cb, @cmd_args);
295             }
296 0         0 );
297             }
298              
299             sub __with_reconnect {
300 0     0   0 my ($self, $cb) = @_;
301              
302             ## Fast path, no reconnect
303             $self->{reconnect}
304 0 0       0 or return $cb->();
305              
306             return &try(
307             $cb,
308             catch {
309 0 0   0   0 ref($_) eq 'Redis::X::Reconnect'
310             or die $_;
311              
312             $self->{__inside_transaction} || $self->{__inside_watch}
313 0 0 0     0 and croak("reconnect disabled inside transaction or watch");
314              
315 0 0       0 scalar @{$self->{queue} || []} && $self->{conservative_reconnect}
316 0 0 0     0 and croak("reconnect disabled while responses are pending and conservative reconnect mode enabled");
317              
318 0         0 $self->connect;
319 0         0 $cb->();
320             }
321 0         0 );
322             }
323              
324             sub __run_cmd {
325 2     2   24 my ($self, $command, $collect_errors, $custom_decode, $cb, @args) = @_;
326              
327 2         6 my $ret;
328             my $wrapper = $cb && $custom_decode
329             ? sub {
330 0     0   0 my ($reply, $error) = @_;
331 0         0 $cb->(scalar $custom_decode->($reply), $error);
332             }
333             : $cb || sub {
334 1     1   10 my ($reply, $error) = @_;
335 1         4 croak "[$command] $error, " if defined $error;
336 1         2 $ret = $reply;
337 2 50 33     66 };
      50        
338              
339 2         43 $self->__send_command($command, @args);
340 2         4 push @{ $self->{queue} }, [$command, $wrapper, $collect_errors];
  2         22  
341              
342 2 50       7 return 1 if $cb;
343              
344 2         23 $self->wait_all_responses;
345             return
346 1 50 33     12 $custom_decode ? $custom_decode->($ret, !wantarray)
    50          
347             : wantarray && ref $ret eq 'ARRAY' ? @$ret
348             : $ret;
349             }
350              
351             sub wait_all_responses {
352 2     2 1 6 my ($self) = @_;
353              
354 2         7 my $queue = $self->{queue};
355 2         19 $self->wait_one_response while @$queue;
356              
357 1         2 return;
358             }
359              
360             sub wait_one_response {
361 2     2 1 5 my ($self) = @_;
362              
363 2         5 my $handler = shift @{ $self->{queue} };
  2         6  
364 2 50       7 return unless $handler;
365              
366 2         8 my ($command, $cb, $collect_errors) = @$handler;
367 2         17 $cb->($self->__read_response($command, $collect_errors));
368              
369 1         4 return;
370             }
371              
372              
373             ### Commands with extra logic
374             sub quit {
375 0     0 1 0 my ($self) = @_;
376 0 0       0 return unless $self->{sock};
377              
378 0 0 0     0 croak "[quit] only works in synchronous mode, "
379             if @_ && ref $_[-1] eq 'CODE';
380              
381             try {
382 0     0   0 $self->wait_all_responses;
383 0         0 $self->__send_command('QUIT');
384 0         0 };
385              
386 0         0 $self->__close_sock();
387              
388 0         0 return 1;
389             }
390              
391             sub shutdown {
392 0     0 1 0 my ($self) = @_;
393 0         0 $self->__is_valid_command('SHUTDOWN');
394              
395 0 0 0     0 croak "[shutdown] only works in synchronous mode, "
396             if @_ && ref $_[-1] eq 'CODE';
397              
398 0 0       0 return unless $self->{sock};
399              
400 0         0 $self->wait_all_responses;
401 0         0 $self->__send_command('SHUTDOWN');
402 0 0       0 $self->__close_sock() || croak("Can't close socket: $!");
403              
404 0         0 return 1;
405             }
406              
407             sub ping {
408 0     0 1 0 my $self = shift;
409 0         0 $self->__is_valid_command('PING');
410              
411 0 0 0     0 croak "[ping] only works in synchronous mode, "
412             if @_ && ref $_[-1] eq 'CODE';
413              
414 0 0       0 return unless defined $self->{sock};
415              
416 0         0 $self->wait_all_responses;
417             return scalar try {
418 0     0   0 $self->__std_cmd('PING');
419             }
420             catch {
421 0     0   0 $self->__close_sock();
422 0         0 return;
423 0         0 };
424             }
425              
426             sub info {
427 0     0 1 0 my $self = shift;
428 0         0 $self->__is_valid_command('INFO');
429              
430             my $custom_decode = sub {
431 0     0   0 my ($reply) = @_;
432 0 0 0     0 return $reply if !defined $reply || ref $reply;
433 0         0 return { map { split(/:/, $_, 2) } grep {/^[^#]/} split(/\r\n/, $reply) };
  0         0  
  0         0  
434 0         0 };
435              
436 0 0 0     0 my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
437              
438             ## Fast path, no reconnect
439             return $self->__run_cmd('INFO', 0, $custom_decode, $cb, @_)
440 0 0       0 unless $self->{reconnect};
441              
442 0         0 my @cmd_args = @_;
443             $self->__with_reconnect(
444             sub {
445 0     0   0 $self->__run_cmd('INFO', 0, $custom_decode, $cb, @cmd_args);
446             }
447 0         0 );
448             }
449              
450             sub keys {
451 0     0 1 0 my $self = shift;
452 0         0 $self->__is_valid_command('KEYS');
453              
454             my $custom_decode = sub {
455 0     0   0 my ($reply, $synchronous_scalar) = @_;
456              
457             ## Support redis <= 1.2.6
458 0 0 0     0 $reply = [split(/\s/, $reply)] if defined $reply && !ref $reply;
459              
460 0 0 0     0 return ref $reply && ($synchronous_scalar || wantarray) ? @$reply : $reply;
461 0         0 };
462              
463 0 0 0     0 my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
464              
465             ## Fast path, no reconnect
466             return $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @_)
467 0 0       0 unless $self->{reconnect};
468              
469 0         0 my @cmd_args = @_;
470             $self->__with_reconnect(
471             sub {
472 0     0   0 $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @cmd_args);
473             }
474 0         0 );
475             }
476              
477              
478             ### PubSub
479             sub wait_for_messages {
480 0     0 1 0 my ($self, $timeout) = @_;
481              
482 0         0 my $s = IO::Select->new;
483              
484 0         0 my $count = 0;
485              
486              
487 0         0 my $e;
488              
489             try {
490             $self->__with_reconnect( sub {
491              
492             # the socket can be changed due to reconnection, so get it each time
493 0         0 my $sock = $self->{sock};
494 0         0 $s->remove($s->handles);
495 0         0 $s->add($sock);
496              
497 0         0 while ($s->can_read($timeout)) {
498 0         0 my $has_stuff = $self->__try_read_sock($sock);
499             # If the socket is ready to read but there is nothing to read, ( so
500             # it's an EOF ), try to reconnect.
501 0 0       0 defined $has_stuff
502             or $self->__throw_reconnect('EOF from server');
503              
504 0         0 my $cond;
505              
506 0 0       0 if ( ! $self->{ssl} ) {
507             $cond = sub {
508             # if __try_read_sock() return 0 (no data)
509             # or undef ( socket became EOF), back to select until timeout
510 0   0     0 return $self->{__buf} || $self->__try_read_sock($sock);
511             }
512 0         0 } else {
513             $cond = sub {
514             # continue if there is still some data left. If the buffer is
515             # larger than 16K, there won't be any pending data left though
516 0   0     0 return $self->{__buf} || $sock->pending;
517             }
518 0         0 }
519              
520 0         0 do {
521 0         0 my ($reply, $error) = $self->__read_response('WAIT_FOR_MESSAGES');
522 0 0       0 croak "[WAIT_FOR_MESSAGES] $error, " if defined $error;
523 0         0 $self->__process_pubsub_msg($reply);
524 0         0 $count++;
525              
526             } while ($cond->());
527             }
528              
529 0     0   0 });
530              
531             } catch {
532 0     0   0 $e = $_;
533 0         0 };
534              
535             # if We had an error and it was not an EOF, die
536 0 0 0     0 defined $e && $e ne 'EOF from server'
537             and die $e;
538              
539 0         0 return $count;
540             }
541              
542             sub __subscription_cmd {
543 0     0   0 my $self = shift;
544 0         0 my $pr = shift;
545 0         0 my $unsub = shift;
546 0         0 my $command = shift;
547 0         0 my $cb = pop;
548              
549 0 0       0 croak("Missing required callback in call to $command(), ")
550             unless ref($cb) eq 'CODE';
551              
552 0         0 $self->wait_all_responses;
553              
554 0         0 my @subs = @_;
555             $self->__with_reconnect(
556             sub {
557             $self->__throw_reconnect('Not connected to any server')
558 0 0   0   0 unless $self->{sock};
559              
560 0 0       0 @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
561             if $unsub;
562 0 0       0 return unless @subs;
563              
564 0         0 $self->__send_command($command, @subs);
565              
566 0         0 my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
  0         0  
567 0         0 return $self->__process_subscription_changes($command, \%cbs);
568             }
569 0         0 );
570             }
571              
572 0     0 1 0 sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) }
573 0     0 1 0 sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) }
574 0     0 1 0 sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) }
575 0     0 1 0 sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
576              
577             sub __process_unsubscribe_requests {
578 0     0   0 my ($self, $cb, $pr, @unsubs) = @_;
579 0         0 my $subs = $self->{subscribers};
580              
581 0         0 my @subs_to_unsubscribe;
582 0         0 for my $sub (@unsubs) {
583 0         0 my $key = "${pr}message:$sub";
584 0         0 my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{ $subs->{$key} }];
  0         0  
  0         0  
585 0 0       0 next if @$cbs;
586              
587 0         0 delete $subs->{$key};
588 0         0 push @subs_to_unsubscribe, $sub;
589             }
590              
591 0         0 return @subs_to_unsubscribe;
592             }
593              
594             sub __process_subscription_changes {
595 0     0   0 my ($self, $cmd, $expected) = @_;
596 0         0 my $subs = $self->{subscribers};
597              
598 0         0 while (%$expected) {
599 0         0 my ($m, $error) = $self->__read_response($cmd);
600 0 0       0 croak "[$cmd] $error, " if defined $error;
601              
602             ## Deal with pending PUBLISH'ed messages
603 0 0       0 if ($m->[0] =~ /^p?message$/) {
604 0         0 $self->__process_pubsub_msg($m);
605 0         0 next;
606             }
607              
608 0         0 my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
609 0         0 $key .= "message:$m->[1]";
610 0         0 my $cb = delete $expected->{$key};
611              
612 0 0       0 push @{ $subs->{$key} }, $cb unless $unsub;
  0         0  
613              
614 0         0 $self->{is_subscriber} = $m->[2];
615             }
616             }
617              
618             sub __process_pubsub_msg {
619 0     0   0 my ($self, $m) = @_;
620 0         0 my $subs = $self->{subscribers};
621              
622 0         0 my $sub = $m->[1];
623 0         0 my $cbid = "$m->[0]:$sub";
624 0         0 my $data = pop @$m;
625 0 0       0 my $topic = defined $m->[2] ? $m->[2] : $sub;
626              
627 0 0       0 if (!defined $subs->{$cbid}) {
628 0         0 warn "Message for topic '$topic' ($cbid) without expected callback, ";
629 0         0 return;
630             }
631              
632 0         0 $_->($data, $topic, $sub) for @{ $subs->{$cbid} };
  0         0  
633              
634 0         0 return 1;
635              
636             }
637              
638              
639             ### Mode validation
640             sub __is_valid_command {
641 2     2   9 my ($self, $cmd) = @_;
642              
643             croak("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
644 2 50       124 if $self->{is_subscriber};
645             }
646              
647              
648             ### Socket operations
649             sub connect {
650 2     2 1 21 my ($self) = @_;
651 2         7 delete $self->{sock};
652 2         29 delete $self->{__inside_watch};
653 2         5 delete $self->{__inside_transaction};
654              
655             # Suppose we have at least one command response pending, but we're about
656             # to reconnect. The new connection will never get a response to any of
657             # the pending commands, so delete all those pending responses now.
658 2         28 $self->{queue} = [];
659 2         47 $self->{pid} = $$;
660              
661             ## Fast path, no reconnect
662 2 50       63 return $self->__build_sock() unless $self->{reconnect};
663              
664             ## Use precise timers on reconnections
665 0         0 require Time::HiRes;
666 0         0 my $t0 = [Time::HiRes::gettimeofday()];
667              
668             ## Reconnect...
669 0         0 while (1) {
670 0         0 eval { $self->__build_sock };
  0         0  
671              
672 0 0       0 last unless $@; ## Connected!
673 0 0       0 die if Time::HiRes::tv_interval($t0) > $self->{reconnect}; ## Timeout
674 0         0 Time::HiRes::usleep($self->{every}); ## Retry in...
675             }
676              
677 0         0 return;
678             }
679              
680             sub __build_sock {
681 2     2   7 my ($self) = @_;
682              
683             do {
684 2         7 $self->{sock} = $self->{builder}->($self);
685 2   33     3 } while (!$self->{sock} && $! == Errno::EINTR);
686              
687 2 50       28 unless ($self->{sock}) {
688 0         0 croak("Could not connect to Redis server at $self->{server}: $!");
689             }
690              
691 2         38 $self->{__buf} = '';
692              
693 2 50 33     22 if (defined $self->{username} && defined $self->{password}) {
    50          
694 0     0   0 try { $self->auth($self->{username}, $self->{password}) }
695             catch {
696 0     0   0 my $error = $_;
697 0         0 $self->{reconnect} = 0;
698 0         0 croak('Redis server authentication error: ' . $error);
699 0         0 };
700             } elsif (defined $self->{password}) {
701 0     0   0 try { $self->auth($self->{password}) }
702             catch {
703 0     0   0 my $error = $_;
704 0         0 $self->{reconnect} = 0;
705 0         0 croak('Redis server authentication error: ' . $error);
706 0         0 };
707             }
708              
709 2         50 $self->__on_connection;
710              
711 2         7 return;
712             }
713              
714             sub __close_sock {
715 1     1   12 my ($self) = @_;
716 1         23 $self->{__buf} = '';
717 1         18 delete $self->{__inside_watch};
718 1         39 delete $self->{__inside_transaction};
719 1 50       29 defined $self->{sock} or return 1;
720 1         35 return close(delete $self->{sock});
721             }
722              
723             sub __on_connection {
724              
725 2     2   9 my ($self) = @_;
726              
727             # If we are in PubSub mode we shouldn't perform any command besides
728             # (p)(un)subscribe
729 2 50       11 if (! $self->{is_subscriber}) {
730             defined $self->{name}
731             and try {
732 0     0   0 my $n = $self->{name};
733 0 0       0 $n = $n->($self) if ref($n) eq 'CODE';
734 0 0       0 $self->client_setname($n) if defined $n;
735 2 50       10 };
736              
737             # don't use select() function as it's caching database name,
738             # rather call select directly
739             defined $self->{current_database}
740 2 50       9 and $self->__std_cmd('select', $self->{current_database});
741             }
742              
743 2         7 foreach my $topic (CORE::keys(%{$self->{subscribers}})) {
  2         38  
744 0 0       0 if ($topic =~ /(p?message):(.*)$/ ) {
745 0         0 my ($key, $channel) = ($1, $2);
746 0 0       0 if ($key eq 'message') {
747 0         0 $self->__send_command('subscribe', $channel);
748 0         0 my (undef, $error) = $self->__read_response('subscribe');
749 0 0       0 defined $error
750             and croak "[subscribe] $error";
751             } else {
752 0         0 $self->__send_command('psubscribe', $channel);
753 0         0 my (undef, $error) = $self->__read_response('psubscribe');
754 0 0       0 defined $error
755             and croak "[psubscribe] $error";
756             }
757             }
758             }
759              
760             defined $self->{on_connect}
761 2 50       19 and $self->{on_connect}->($self);
762              
763             }
764              
765              
766             sub __send_command {
767 2     2   6 my $self = shift;
768 2         7 my $cmd = uc(shift);
769 2         14 my $deb = $self->{debug};
770              
771             # if already connected but after a fork, reconnect
772 2 50 50     87 if ($self->{sock} && ($self->{pid} || 0) != $$) {
      33        
773 0         0 $self->connect;
774             }
775              
776             my $sock = $self->{sock}
777 2   33     14 || $self->__throw_reconnect('Not connected to any server');
778              
779 2 50       7 warn "[SEND] $cmd ", Dumper([@_]) if $deb;
780              
781             ## Encode command using multi-bulk format
782 2         14 my @cmd = split /_/, $cmd;
783 2         9 my $n_elems = scalar(@_) + scalar(@cmd);
784 2         17 my $buf = "\*$n_elems\r\n";
785 2         30 for my $bin (@cmd, @_) {
786 4 50       22 utf8::downgrade($bin, 1)
787             or croak "command sent is not an octet sequence in the native encoding (Latin-1). Consider using debug mode to see the command itself.";
788 4 50       23 $buf .= defined($bin) ? '$' . length($bin) . "\r\n$bin\r\n" : "\$-1\r\n";
789             }
790              
791             # this function works differently with a SSL socket cause it's not
792             # possible to read just a few bytes from a TLS frame.
793 2         48 my $status = $self->__try_read_sock($sock);
794 2 50       9 $self->__throw_reconnect('Not connected to any server')
795             unless defined $status;
796              
797             ## Send command, take care for partial writes
798 2 50       8 warn "[SEND RAW] $buf" if $deb;
799 2         21 while ($buf) {
800 2         35 my $len = syswrite $sock, $buf, length $buf;
801 2 50       403 $self->__throw_reconnect("Could not write to Redis server: $!")
802             unless defined $len;
803 2         31 substr $buf, 0, $len, "";
804             }
805              
806 2         9 return;
807             }
808              
809             sub __read_response {
810 2     2   8 my ($self, $cmd, $collect_errors) = @_;
811              
812 2 50       7 croak("Not connected to any server") unless $self->{sock};
813              
814 2         40 local $/ = "\r\n";
815              
816             ## no debug => fast path
817 2 50       39 return $self->__read_response_r($cmd, $collect_errors) unless $self->{debug};
818              
819 0         0 my ($result, $error) = $self->__read_response_r($cmd, $collect_errors);
820 0         0 warn "[RECV] $cmd ", Dumper($result, $error);
821 0         0 return $result, $error;
822             }
823              
824             sub __read_response_r {
825 2     2   9 my ($self, $command, $collect_errors) = @_;
826              
827 2         9 my ($type, $result) = $self->__read_line;
828              
829 1 50 33     17 if ($type eq '-') {
    50          
    0          
    0          
830 0         0 return undef, $result;
831             }
832             elsif ($type eq '+' || $type eq ':') {
833 1         8 return $result, undef;
834             }
835             elsif ($type eq '$') {
836 0 0       0 return undef, undef if $result < 0;
837 0         0 return $self->__read_len($result + 2), undef;
838             }
839             elsif ($type eq '*') {
840 0 0       0 return undef, undef if $result < 0;
841              
842 0         0 my @list;
843 0         0 while ($result--) {
844 0         0 my @nested = $self->__read_response_r($command, $collect_errors);
845 0 0       0 if ($collect_errors) {
846 0         0 push @list, \@nested;
847             }
848             else {
849 0 0       0 croak "[$command] $nested[1], " if defined $nested[1];
850 0         0 push @list, $nested[0];
851             }
852             }
853 0         0 return \@list, undef;
854             }
855             else {
856 0         0 croak "unknown answer type: $type ($result), ";
857             }
858             }
859              
860             sub __read_line {
861 2     2   5 my $self = $_[0];
862 2         6 my $sock = $self->{sock};
863              
864 2         7 my $data = $self->__read_line_raw;
865 2 100       12 if (! defined $data) {
866             # In case the caller catches the exception and wants to persist on using
867             # the redis connection, let's forbid that.
868 1         41 $self->__close_sock();
869 1         2005 croak("Error while reading from Redis server: $!")
870             }
871              
872 1         2 chomp $data;
873 1 50       3 warn "[RECV RAW] '$data'" if $self->{debug};
874              
875 1         3 my $type = substr($data, 0, 1, '');
876 1         4 return ($type, $data);
877             }
878              
879             sub __read_line_raw {
880 2     2   6 my $self = $_[0];
881 2         6 my $sock = $self->{sock};
882 2         5 my $buf = \$self->{__buf};
883              
884 2 50       21 if (length $$buf) {
885 0         0 my $idx = index($$buf, "\r\n");
886 0 0       0 $idx >= 0 and return substr($$buf, 0, $idx + 2, '');
887             }
888              
889 2         6 while (1) {
890 2         28 my $bytes = sysread($sock, $$buf, BUFSIZE, length($$buf));
891 2 50 66     1035668 next if !defined $bytes && $! == EINTR;
892 2 100 66     35 return unless defined $bytes && $bytes;
893              
894             # start looking for \r\n where we stopped last time
895             # extracting one is required to handle corner case
896             # where \r\n are split and therefore read by two conseqent sysreads
897 1         5 my $idx = index($$buf, "\r\n", length($$buf) - $bytes - 1);
898 1 50       7 $idx >= 0 and return substr($$buf, 0, $idx + 2, '');
899             }
900             }
901              
902             sub __read_len {
903 0     0   0 my ($self, $len) = @_;
904 0         0 my $buf = \$self->{__buf};
905 0         0 my $buflen = length($$buf);
906              
907 0 0       0 if ($buflen < $len) {
908 0         0 my $to_read = $len - $buflen;
909 0         0 while ($to_read > 0) {
910 0         0 my $bytes = sysread($self->{sock}, $$buf, BUFSIZE, length($$buf));
911 0 0 0     0 next if !defined $bytes && $! == EINTR;
912 0 0       0 croak("Error while reading from Redis server: $!") unless defined $bytes;
913 0 0       0 croak("Redis server closed connection") unless $bytes;
914 0         0 $to_read -= $bytes;
915             }
916             }
917              
918 0         0 my $data = substr($$buf, 0, $len, '');
919 0         0 chomp $data;
920 0 0       0 warn "[RECV RAW] '$data'" if $self->{debug};
921              
922 0         0 return $data;
923             }
924              
925             sub __try_read_sock {
926 2     2   10 my ($self, $sock) = @_;
927 2         24 my $data = '';
928              
929 2         7 while (1) {
930             # WIN32 doesn't support MSG_DONTWAIT,
931             # need to swith fh to nonblockng mode manually.
932             # For Unix still use MSG_DONTWAIT because of fewer syscalls
933 2         5 my ($res, $err);
934 2 50       57 if (WIN32) {
935 0         0 __fh_nonblocking_win32($sock, 1);
936 0         0 $res = recv($sock, $data, BUFSIZE, 0);
937 0         0 $err = 0 + $!;
938 0         0 __fh_nonblocking_win32($sock, 0);
939             } else {
940 2 50       11 if ($self->{ssl}) {
941             ## use peek to see if there is any data available instead of reading
942             ## it cause it's not possible to read only a few bytes from an SSL
943             ## frame. This does not work in WIN32
944 2         57 $sock->blocking(0);
945 2         127 $res = $sock->peek($data, BUFSIZE);
946 2         519 $sock->blocking(1);
947             } else {
948 0         0 $res = recv($sock, $data, BUFSIZE, MSG_DONTWAIT);
949             }
950              
951 2         57 $err = 0 + $!;
952             }
953              
954 2 50       10 if (defined $res) {
955             ## have read some data
956 0 0       0 if (length($data)) {
957 0 0       0 $self->{__buf} .= $data unless $self->{ssl};
958 0         0 return 1;
959             }
960              
961             ## no data but also no error means EOF
962 0         0 return;
963             }
964              
965 2 50 33     62 next if $err && $err == EINTR;
966              
967             ## Keep going if nothing there, but socket is alive
968 2 50 33     26 return 0 if $err and ($err == EWOULDBLOCK or $err == EAGAIN);
      33        
969              
970             ## if we got ECONNRESET, it might be due a timeout from the other side (on freebsd)
971             ## or because an intermediate proxy shut down our connection using its internal timeout counter
972 0 0 0       return 0 if ($err && $err == ECONNRESET);
973              
974             ## result is undef but err is 0? should never happen
975 0 0         return if $err == 0;
976              
977             ## For everything else, there is Mastercard...
978 0           croak("Unexpected error condition $err/$^O, please report this as a bug");
979             }
980             }
981              
982             ## Copied from AnyEvent::Util
983             sub __fh_nonblocking_win32 {
984 0     0     ioctl $_[0], 0x8004667e, pack "L", $_[1];
985             }
986              
987             ##########################
988             # I take exception to that
989              
990             sub __throw_reconnect {
991 0     0     my ($self, $m) = @_;
992 0 0         die bless(\$m, 'Redis::X::Reconnect') if $self->{reconnect};
993 0           die $m;
994             }
995              
996              
997             1; # End of Redis.pm
998              
999             __END__