File Coverage

blib/lib/Redis.pm
Criterion Covered Total %
statement 225 503 44.7
branch 53 254 20.8
condition 33 138 23.9
subroutine 43 82 52.4
pod 16 16 100.0
total 370 993 37.2


line stmt bran cond sub pod time code
1             #
2             # This file is part of Redis
3             #
4             # This software is Copyright (c) 2015 by Pedro Melo, Damien Krotkine.
5             #
6             # This is free software, licensed under:
7             #
8             # The Artistic License 2.0 (GPL Compatible)
9             #
10             package Redis;
11             $Redis::VERSION = '1.999';
12             # ABSTRACT: Perl binding for Redis database
13             # VERSION
14             # AUTHORITY
15              
16 15     15   996306 use warnings;
  15         147  
  15         550  
17 15     15   81 use strict;
  15         27  
  15         323  
18              
19 15     15   7644 use IO::Socket::INET;
  15         336798  
  15         98  
20 15     15   7058 use IO::Socket::UNIX;
  15         33  
  15         115  
21 15     15   20316 use IO::Socket::Timeout;
  15         97234  
  15         129  
22 15     15   8159 use IO::Select;
  15         26027  
  15         792  
23 15     15   109 use IO::Handle;
  15         31  
  15         540  
24 15     15   84 use Fcntl qw( O_NONBLOCK F_SETFL );
  15         31  
  15         669  
25 15     15   80 use Errno ();
  15         25  
  15         228  
26 15     15   10385 use Data::Dumper;
  15         108578  
  15         1101  
27 15     15   125 use Carp;
  15         30  
  15         751  
28 15     15   3020 use Try::Tiny;
  15         12523  
  15         820  
29 15     15   103 use Scalar::Util ();
  15         33  
  15         246  
30              
31 15     15   6732 use Redis::Sentinel;
  15         42  
  15         616  
32              
33 15     15   110 use constant SSL_AVAILABLE => eval { require IO::Socket::SSL };
  15         30  
  15         28  
  15         12462  
34              
35 15     15   980509 use constant WIN32 => $^O =~ /mswin32/i;
  15         40  
  15         1295  
36 15   50 15   110 use constant EWOULDBLOCK => eval {Errno::EWOULDBLOCK} || -1E9;
  15         43  
  15         174  
37 15   50 15   94 use constant EAGAIN => eval {Errno::EAGAIN} || -1E9;
  15         59  
  15         73  
38 15   50 15   101 use constant EINTR => eval {Errno::EINTR} || -1E9;
  15         30  
  15         26  
39 15   50 15   103 use constant ECONNRESET => eval {Errno::ECONNRESET} || -1E9;
  15         29  
  15         38  
40              
41             # According to IO::Socket::SSL documentation, 16k is the maximum
42             # size of an SSL frame and because sysread returns data from only
43             # a single SSL frame you guarantee this way, that there is no pending
44             # data.
45 15     15   94 use constant BUFSIZE => 16_384;
  15         29  
  15         21468  
46              
47             sub _maybe_enable_timeouts {
48 2     2   41448 my ($self, $socket) = @_;
49 2 50       18 $socket or return;
50             defined $self->{read_timeout} || defined $self->{write_timeout}
51 2 50 33     17 or return $socket;
52 2         102 IO::Socket::Timeout->enable_timeouts_on($socket);
53             defined $self->{read_timeout}
54 2 50       880 and $socket->read_timeout($self->{read_timeout});
55             defined $self->{write_timeout}
56 2 50       194 and $socket->write_timeout($self->{write_timeout});
57 2         14 $socket;
58             }
59              
60             sub new {
61 2     2 1 42420 my ($class, %args) = @_;
62 2         35 my $self = bless {}, $class;
63              
64 2         60 $self->{__buf} = '';
65 2   33     76 $self->{debug} = $args{debug} || $ENV{REDIS_DEBUG};
66              
67             ## Deal with REDIS_SERVER ENV
68 2 0 33     56 if ($ENV{REDIS_SERVER} && ! defined $args{sock} && ! defined $args{server} && ! defined $args{sentinels}) {
      33        
      0        
69 0 0       0 if ($ENV{REDIS_SERVER} =~ m!^/!) {
    0          
    0          
70 0         0 $args{sock} = $ENV{REDIS_SERVER};
71             }
72             elsif ($ENV{REDIS_SERVER} =~ m!^unix:(.+)!) {
73 0         0 $args{sock} = $1;
74             }
75             elsif ($ENV{REDIS_SERVER} =~ m!^(?:tcp:)?(.+)!) {
76 0         0 $args{server} = $1;
77             }
78             }
79              
80             defined $args{$_}
81 2   66     48 and $self->{$_} = $args{$_} for
82             qw(password on_connect name no_auto_connect_on_new cnx_timeout
83             write_timeout read_timeout sentinels_cnx_timeout sentinels_write_timeout
84             sentinels_read_timeout no_sentinels_list_update);
85              
86 2   50     27 $self->{reconnect} = $args{reconnect} || 0;
87 2   50     22 $self->{conservative_reconnect} = $args{conservative_reconnect} || 0;
88 2   50     9 $self->{every} = $args{every} || 1000;
89              
90 2 50       24 if (defined $args{sock}) {
    50          
91 0         0 $self->{server} = $args{sock};
92             $self->{builder} = sub {
93 0     0   0 my ($self) = @_;
94             $self->_maybe_enable_timeouts(
95             IO::Socket::UNIX->new(
96             Peer => $self->{server},
97 0 0       0 ( $self->{cnx_timeout} ? ( Timeout => $self->{cnx_timeout} ): () ),
98             )
99             );
100 0         0 };
101             } elsif ($args{sentinels}) {
102 0         0 $self->{sentinels} = $args{sentinels};
103              
104 0 0       0 ref $self->{sentinels} eq 'ARRAY'
105             or croak("'sentinels' param must be an ArrayRef");
106              
107             defined($self->{service} = $args{service})
108 0 0       0 or croak("Need 'service' name when using 'sentinels'!");
109              
110             $self->{builder} = sub {
111 0     0   0 my ($self) = @_;
112             # try to connect to a sentinel
113 0         0 my $status;
114 0         0 foreach my $sentinel_address (@{$self->{sentinels}}) {
  0         0  
115 0 0       0 my $sentinel = eval {
116             Redis::Sentinel->new(
117             server => $sentinel_address,
118             cnx_timeout => ( defined $self->{sentinels_cnx_timeout}
119             ? $self->{sentinels_cnx_timeout} : 0.1),
120             read_timeout => ( defined $self->{sentinels_read_timeout}
121             ? $self->{sentinels_read_timeout} : 1 ),
122             write_timeout => ( defined $self->{sentinels_write_timeout}
123 0 0       0 ? $self->{sentinels_write_timeout} : 1 ),
    0          
    0          
124             )
125             } or next;
126 0         0 my $server_address = $sentinel->get_service_address($self->{service});
127 0 0 0     0 defined $server_address
128             or $status ||= "Sentinels don't know this service",
129             next;
130 0 0       0 $server_address eq 'IDONTKNOW'
131             and $status = "service is configured in one Sentinel, but was never reached",
132             next;
133              
134             # we found the service, set the server
135 0         0 $self->{server} = $server_address;
136              
137 0 0       0 if (! $self->{no_sentinels_list_update} ) {
138             # move the elected sentinel at the front of the list and add
139             # additional sentinels
140 0         0 my $idx = 2;
141 0         0 my %h = ( ( map { $_ => $idx++ } @{$self->{sentinels}}),
  0         0  
  0         0  
142             $sentinel_address => 1,
143             );
144             $self->{sentinels} = [
145 0         0 ( sort { $h{$a} <=> $h{$b} } keys %h ), # sorted existing sentinels,
146 0         0 grep { ! $h{$_}; } # list of unknown
147 0         0 map { +{ @$_ }->{name}; } # names of
148             $sentinel->sentinel( # sentinels
149             sentinels => $self->{service} # for this service
150             )
151 0         0 ];
152             }
153              
154 0         0 my $socket_class;
155              
156             my %socket_args = (
157             PeerAddr => $server_address,
158 0 0       0 ( $self->{cnx_timeout} ? ( Timeout => $self->{cnx_timeout} ) : () ),
159             );
160              
161 0 0 0     0 if (exists $args{ssl} and $args{ssl}) {
162 0         0 croak("Redis client does not support SSL with Redis Sentinel yet");
163             }
164             else {
165 0         0 $self->{ssl} = 0;
166 0         0 $socket_class = 'IO::Socket::INET';
167             }
168              
169 0         0 return $self->_maybe_enable_timeouts(
170             $socket_class->new(%socket_args)
171             );
172             }
173 0   0     0 croak($status || "failed to connect to any of the sentinels");
174 0         0 };
175             } else {
176 2 50       10 $self->{server} = defined $args{server} ? $args{server} : '127.0.0.1:6379';
177             $self->{builder} = sub {
178 2     2   9 my ($self) = @_;
179              
180 2         4 my $socket_class;
181              
182             my %socket_args = (
183             PeerAddr => $self->{server},
184 2 50       29 ( $self->{cnx_timeout} ? ( Timeout => $self->{cnx_timeout} ) : () ),
185             );
186              
187 2 50 33     32 if (exists $args{ssl} and $args{ssl}) {
188 2         5 if ( ! SSL_AVAILABLE ) {
189             croak("IO::Socket::SSL is required for connecting to Redis using SSL");
190             }
191              
192 2         5 $self->{ssl} = 1;
193 2         3 $socket_class = 'IO::Socket::SSL';
194 2   50     15 $socket_args{SSL_verify_mode} = $args{SSL_verify_mode} // 1;
195             }
196             else {
197 0         0 $self->{ssl} = 0;
198 0         0 $socket_class = 'IO::Socket::INET';
199             }
200              
201 2         53 return $self->_maybe_enable_timeouts(
202             $socket_class->new(%socket_args)
203             );
204 2         89 };
205             }
206              
207 2         26 $self->{is_subscriber} = 0;
208 2         14 $self->{subscribers} = {};
209              
210 2 50       67 $self->connect unless $args{no_auto_connect_on_new};
211              
212 2         8 return $self;
213             }
214              
215 0     0 1 0 sub is_subscriber { $_[0]{is_subscriber} }
216              
217             sub select {
218 0     0 1 0 my $self = shift;
219 0         0 my $database = shift;
220              
221 0 0       0 croak( "Cannot select an undefined redis database" )
222             unless defined $database;
223             # don't want to send multiple select() back and forth
224 0 0 0     0 if (!defined $self->{current_database} or $self->{current_database} ne $database) {
225 0         0 my $ret = $self->__std_cmd('select', $database, @_);
226 0         0 $self->{current_database} = $database;
227 0         0 return $ret;
228             };
229 0         0 return "OK"; # emulate redis response as of 3.0.6 just in case anybody cares
230             }
231              
232             ### we don't want DESTROY to fallback into AUTOLOAD
233       0     sub DESTROY { }
234              
235              
236             ### Deal with common, general case, Redis commands
237             our $AUTOLOAD;
238              
239             sub AUTOLOAD {
240 1     1   2065 my $command = $AUTOLOAD;
241 1         14 $command =~ s/.*://;
242              
243 1     2   16 my $method = sub { shift->__std_cmd($command, @_) };
  2         2169  
244              
245             # Save this method for future calls
246 15     15   153 no strict 'refs';
  15         33  
  15         76826  
247 1         38 *$AUTOLOAD = $method;
248              
249 1         7 goto $method;
250             }
251              
252             sub __std_cmd {
253 2     2   8 my $self = shift;
254 2         4 my $command = shift;
255              
256 2         16 $self->__is_valid_command($command);
257              
258 2 50 33     35 my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
259              
260             # If this is an EXEC command, in pipelined mode, and one of the commands
261             # executed in the transaction yields an error, we must collect all errors
262             # from that command, rather than throwing an exception immediately.
263 2         58 my $uc_command = uc($command);
264 2   33     7 my $collect_errors = $cb && $uc_command eq 'EXEC';
265              
266 2 50 33     35 if ($uc_command eq 'MULTI') {
    50          
    50          
    50          
267 0         0 $self->{__inside_transaction} = 1;
268             } elsif ($uc_command eq 'EXEC' || $uc_command eq 'DISCARD') {
269 0         0 delete $self->{__inside_transaction};
270 0         0 delete $self->{__inside_watch};
271             } elsif ($uc_command eq 'WATCH') {
272 0         0 $self->{__inside_watch} = 1;
273             } elsif ($uc_command eq 'UNWATCH') {
274 0         0 delete $self->{__inside_watch};
275             }
276              
277             ## Fast path, no reconnect;
278             $self->{reconnect}
279 2 50       12 or return $self->__run_cmd($command, $collect_errors, undef, $cb, @_);
280              
281 0         0 my @cmd_args = @_;
282             $self->__with_reconnect(
283             sub {
284 0     0   0 $self->__run_cmd($command, $collect_errors, undef, $cb, @cmd_args);
285             }
286 0         0 );
287             }
288              
289             sub __with_reconnect {
290 0     0   0 my ($self, $cb) = @_;
291              
292             ## Fast path, no reconnect
293             $self->{reconnect}
294 0 0       0 or return $cb->();
295              
296             return &try(
297             $cb,
298             catch {
299 0 0   0   0 ref($_) eq 'Redis::X::Reconnect'
300             or die $_;
301              
302             $self->{__inside_transaction} || $self->{__inside_watch}
303 0 0 0     0 and croak("reconnect disabled inside transaction or watch");
304              
305 0 0       0 scalar @{$self->{queue} || []} && $self->{conservative_reconnect}
306 0 0 0     0 and croak("reconnect disabled while responses are pending and conservative reconnect mode enabled");
307              
308 0         0 $self->connect;
309 0         0 $cb->();
310             }
311 0         0 );
312             }
313              
314             sub __run_cmd {
315 2     2   22 my ($self, $command, $collect_errors, $custom_decode, $cb, @args) = @_;
316              
317 2         5 my $ret;
318             my $wrapper = $cb && $custom_decode
319             ? sub {
320 0     0   0 my ($reply, $error) = @_;
321 0         0 $cb->(scalar $custom_decode->($reply), $error);
322             }
323             : $cb || sub {
324 1     1   3 my ($reply, $error) = @_;
325 1         3 croak "[$command] $error, " if defined $error;
326 1         2 $ret = $reply;
327 2 50 33     47 };
      50        
328              
329 2         26 $self->__send_command($command, @args);
330 2         3 push @{ $self->{queue} }, [$command, $wrapper, $collect_errors];
  2         10  
331              
332 2 50       7 return 1 if $cb;
333              
334 2         8 $self->wait_all_responses;
335             return
336 1 50 33     24 $custom_decode ? $custom_decode->($ret, !wantarray)
    50          
337             : wantarray && ref $ret eq 'ARRAY' ? @$ret
338             : $ret;
339             }
340              
341             sub wait_all_responses {
342 2     2 1 5 my ($self) = @_;
343              
344 2         4 my $queue = $self->{queue};
345 2         11 $self->wait_one_response while @$queue;
346              
347 1         3 return;
348             }
349              
350             sub wait_one_response {
351 2     2 1 4 my ($self) = @_;
352              
353 2         4 my $handler = shift @{ $self->{queue} };
  2         4  
354 2 50       7 return unless $handler;
355              
356 2         6 my ($command, $cb, $collect_errors) = @$handler;
357 2         17 $cb->($self->__read_response($command, $collect_errors));
358              
359 1         4 return;
360             }
361              
362              
363             ### Commands with extra logic
364             sub quit {
365 0     0 1 0 my ($self) = @_;
366 0 0       0 return unless $self->{sock};
367              
368 0 0 0     0 croak "[quit] only works in synchronous mode, "
369             if @_ && ref $_[-1] eq 'CODE';
370              
371             try {
372 0     0   0 $self->wait_all_responses;
373 0         0 $self->__send_command('QUIT');
374 0         0 };
375              
376 0         0 $self->__close_sock();
377              
378 0         0 return 1;
379             }
380              
381             sub shutdown {
382 0     0 1 0 my ($self) = @_;
383 0         0 $self->__is_valid_command('SHUTDOWN');
384              
385 0 0 0     0 croak "[shutdown] only works in synchronous mode, "
386             if @_ && ref $_[-1] eq 'CODE';
387              
388 0 0       0 return unless $self->{sock};
389              
390 0         0 $self->wait_all_responses;
391 0         0 $self->__send_command('SHUTDOWN');
392 0 0       0 $self->__close_sock() || croak("Can't close socket: $!");
393              
394 0         0 return 1;
395             }
396              
397             sub ping {
398 0     0 1 0 my $self = shift;
399 0         0 $self->__is_valid_command('PING');
400              
401 0 0 0     0 croak "[ping] only works in synchronous mode, "
402             if @_ && ref $_[-1] eq 'CODE';
403              
404 0 0       0 return unless defined $self->{sock};
405              
406 0         0 $self->wait_all_responses;
407             return scalar try {
408 0     0   0 $self->__std_cmd('PING');
409             }
410             catch {
411 0     0   0 $self->__close_sock();
412 0         0 return;
413 0         0 };
414             }
415              
416             sub info {
417 0     0 1 0 my $self = shift;
418 0         0 $self->__is_valid_command('INFO');
419              
420             my $custom_decode = sub {
421 0     0   0 my ($reply) = @_;
422 0 0 0     0 return $reply if !defined $reply || ref $reply;
423 0         0 return { map { split(/:/, $_, 2) } grep {/^[^#]/} split(/\r\n/, $reply) };
  0         0  
  0         0  
424 0         0 };
425              
426 0 0 0     0 my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
427              
428             ## Fast path, no reconnect
429             return $self->__run_cmd('INFO', 0, $custom_decode, $cb, @_)
430 0 0       0 unless $self->{reconnect};
431              
432 0         0 my @cmd_args = @_;
433             $self->__with_reconnect(
434             sub {
435 0     0   0 $self->__run_cmd('INFO', 0, $custom_decode, $cb, @cmd_args);
436             }
437 0         0 );
438             }
439              
440             sub keys {
441 0     0 1 0 my $self = shift;
442 0         0 $self->__is_valid_command('KEYS');
443              
444             my $custom_decode = sub {
445 0     0   0 my ($reply, $synchronous_scalar) = @_;
446              
447             ## Support redis <= 1.2.6
448 0 0 0     0 $reply = [split(/\s/, $reply)] if defined $reply && !ref $reply;
449              
450 0 0 0     0 return ref $reply && ($synchronous_scalar || wantarray) ? @$reply : $reply;
451 0         0 };
452              
453 0 0 0     0 my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
454              
455             ## Fast path, no reconnect
456             return $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @_)
457 0 0       0 unless $self->{reconnect};
458              
459 0         0 my @cmd_args = @_;
460             $self->__with_reconnect(
461             sub {
462 0     0   0 $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @cmd_args);
463             }
464 0         0 );
465             }
466              
467              
468             ### PubSub
469             sub wait_for_messages {
470 0     0 1 0 my ($self, $timeout) = @_;
471              
472 0         0 my $s = IO::Select->new;
473              
474 0         0 my $count = 0;
475              
476              
477 0         0 my $e;
478              
479             try {
480             $self->__with_reconnect( sub {
481              
482             # the socket can be changed due to reconnection, so get it each time
483 0         0 my $sock = $self->{sock};
484 0         0 $s->remove($s->handles);
485 0         0 $s->add($sock);
486              
487 0         0 while ($s->can_read($timeout)) {
488 0         0 my $has_stuff = $self->__try_read_sock($sock);
489             # If the socket is ready to read but there is nothing to read, ( so
490             # it's an EOF ), try to reconnect.
491 0 0       0 defined $has_stuff
492             or $self->__throw_reconnect('EOF from server');
493              
494 0         0 my $cond;
495              
496 0 0       0 if ( ! $self->{ssl} ) {
497             $cond = sub {
498             # if __try_read_sock() return 0 (no data)
499             # or undef ( socket became EOF), back to select until timeout
500 0   0     0 return $self->{__buf} || $self->__try_read_sock($sock);
501             }
502 0         0 } else {
503             $cond = sub {
504             # continue if there is still some data left. If the buffer is
505             # larger than 16K, there won't be any pending data left though
506 0   0     0 return $self->{__buf} || $sock->pending;
507             }
508 0         0 }
509              
510 0         0 do {
511 0         0 my ($reply, $error) = $self->__read_response('WAIT_FOR_MESSAGES');
512 0 0       0 croak "[WAIT_FOR_MESSAGES] $error, " if defined $error;
513 0         0 $self->__process_pubsub_msg($reply);
514 0         0 $count++;
515              
516             } while ($cond->());
517             }
518              
519 0     0   0 });
520              
521             } catch {
522 0     0   0 $e = $_;
523 0         0 };
524              
525             # if We had an error and it was not an EOF, die
526 0 0 0     0 defined $e && $e ne 'EOF from server'
527             and die $e;
528              
529 0         0 return $count;
530             }
531              
532             sub __subscription_cmd {
533 0     0   0 my $self = shift;
534 0         0 my $pr = shift;
535 0         0 my $unsub = shift;
536 0         0 my $command = shift;
537 0         0 my $cb = pop;
538              
539 0 0       0 croak("Missing required callback in call to $command(), ")
540             unless ref($cb) eq 'CODE';
541              
542 0         0 $self->wait_all_responses;
543              
544 0         0 my @subs = @_;
545             $self->__with_reconnect(
546             sub {
547             $self->__throw_reconnect('Not connected to any server')
548 0 0   0   0 unless $self->{sock};
549              
550 0 0       0 @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
551             if $unsub;
552 0 0       0 return unless @subs;
553              
554 0         0 $self->__send_command($command, @subs);
555              
556 0         0 my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
  0         0  
557 0         0 return $self->__process_subscription_changes($command, \%cbs);
558             }
559 0         0 );
560             }
561              
562 0     0 1 0 sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) }
563 0     0 1 0 sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) }
564 0     0 1 0 sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) }
565 0     0 1 0 sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
566              
567             sub __process_unsubscribe_requests {
568 0     0   0 my ($self, $cb, $pr, @unsubs) = @_;
569 0         0 my $subs = $self->{subscribers};
570              
571 0         0 my @subs_to_unsubscribe;
572 0         0 for my $sub (@unsubs) {
573 0         0 my $key = "${pr}message:$sub";
574 0         0 my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{ $subs->{$key} }];
  0         0  
  0         0  
575 0 0       0 next if @$cbs;
576              
577 0         0 delete $subs->{$key};
578 0         0 push @subs_to_unsubscribe, $sub;
579             }
580              
581 0         0 return @subs_to_unsubscribe;
582             }
583              
584             sub __process_subscription_changes {
585 0     0   0 my ($self, $cmd, $expected) = @_;
586 0         0 my $subs = $self->{subscribers};
587              
588 0         0 while (%$expected) {
589 0         0 my ($m, $error) = $self->__read_response($cmd);
590 0 0       0 croak "[$cmd] $error, " if defined $error;
591              
592             ## Deal with pending PUBLISH'ed messages
593 0 0       0 if ($m->[0] =~ /^p?message$/) {
594 0         0 $self->__process_pubsub_msg($m);
595 0         0 next;
596             }
597              
598 0         0 my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
599 0         0 $key .= "message:$m->[1]";
600 0         0 my $cb = delete $expected->{$key};
601              
602 0 0       0 push @{ $subs->{$key} }, $cb unless $unsub;
  0         0  
603              
604 0         0 $self->{is_subscriber} = $m->[2];
605             }
606             }
607              
608             sub __process_pubsub_msg {
609 0     0   0 my ($self, $m) = @_;
610 0         0 my $subs = $self->{subscribers};
611              
612 0         0 my $sub = $m->[1];
613 0         0 my $cbid = "$m->[0]:$sub";
614 0         0 my $data = pop @$m;
615 0 0       0 my $topic = defined $m->[2] ? $m->[2] : $sub;
616              
617 0 0       0 if (!defined $subs->{$cbid}) {
618 0         0 warn "Message for topic '$topic' ($cbid) without expected callback, ";
619 0         0 return;
620             }
621              
622 0         0 $_->($data, $topic, $sub) for @{ $subs->{$cbid} };
  0         0  
623              
624 0         0 return 1;
625              
626             }
627              
628              
629             ### Mode validation
630             sub __is_valid_command {
631 2     2   6 my ($self, $cmd) = @_;
632              
633             croak("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
634 2 50       13 if $self->{is_subscriber};
635             }
636              
637              
638             ### Socket operations
639             sub connect {
640 2     2 1 9 my ($self) = @_;
641 2         12 delete $self->{sock};
642 2         5 delete $self->{__inside_watch};
643 2         4 delete $self->{__inside_transaction};
644              
645             # Suppose we have at least one command response pending, but we're about
646             # to reconnect. The new connection will never get a response to any of
647             # the pending commands, so delete all those pending responses now.
648 2         28 $self->{queue} = [];
649 2         35 $self->{pid} = $$;
650              
651             ## Fast path, no reconnect
652 2 50       27 return $self->__build_sock() unless $self->{reconnect};
653              
654             ## Use precise timers on reconnections
655 0         0 require Time::HiRes;
656 0         0 my $t0 = [Time::HiRes::gettimeofday()];
657              
658             ## Reconnect...
659 0         0 while (1) {
660 0         0 eval { $self->__build_sock };
  0         0  
661              
662 0 0       0 last unless $@; ## Connected!
663 0 0       0 die if Time::HiRes::tv_interval($t0) > $self->{reconnect}; ## Timeout
664 0         0 Time::HiRes::usleep($self->{every}); ## Retry in...
665             }
666              
667 0         0 return;
668             }
669              
670             sub __build_sock {
671 2     2   18 my ($self) = @_;
672              
673 2   33     17 $self->{sock} = $self->{builder}->($self)
674             || croak("Could not connect to Redis server at $self->{server}: $!");
675              
676 2         29 $self->{__buf} = '';
677              
678 2 50       10 if (defined $self->{password}) {
679 0     0   0 try { $self->auth($self->{password}) }
680             catch {
681 0     0   0 my $error = $_;
682 0         0 $self->{reconnect} = 0;
683 0         0 croak('Redis server authentication error: ' . $error);
684 0         0 };
685             }
686              
687 2         24 $self->__on_connection;
688              
689 2         4 return;
690             }
691              
692             sub __close_sock {
693 1     1   15 my ($self) = @_;
694 1         23 $self->{__buf} = '';
695 1         14 delete $self->{__inside_watch};
696 1         13 delete $self->{__inside_transaction};
697 1 50       26 defined $self->{sock} or return 1;
698 1         38 return close(delete $self->{sock});
699             }
700              
701             sub __on_connection {
702              
703 2     2   8 my ($self) = @_;
704              
705             # If we are in PubSub mode we shouldn't perform any command besides
706             # (p)(un)subscribe
707 2 50       14 if (! $self->{is_subscriber}) {
708             defined $self->{name}
709             and try {
710 0     0   0 my $n = $self->{name};
711 0 0       0 $n = $n->($self) if ref($n) eq 'CODE';
712 0 0       0 $self->client_setname($n) if defined $n;
713 2 50       9 };
714              
715             # don't use select() function as it's caching database name,
716             # rather call select directly
717             defined $self->{current_database}
718 2 50       17 and $self->__std_cmd('select', $self->{current_database});
719             }
720              
721 2         5 foreach my $topic (CORE::keys(%{$self->{subscribers}})) {
  2         25  
722 0 0       0 if ($topic =~ /(p?message):(.*)$/ ) {
723 0         0 my ($key, $channel) = ($1, $2);
724 0 0       0 if ($key eq 'message') {
725 0         0 $self->__send_command('subscribe', $channel);
726 0         0 my (undef, $error) = $self->__read_response('subscribe');
727 0 0       0 defined $error
728             and croak "[subscribe] $error";
729             } else {
730 0         0 $self->__send_command('psubscribe', $channel);
731 0         0 my (undef, $error) = $self->__read_response('psubscribe');
732 0 0       0 defined $error
733             and croak "[psubscribe] $error";
734             }
735             }
736             }
737              
738             defined $self->{on_connect}
739 2 50       12 and $self->{on_connect}->($self);
740              
741             }
742              
743              
744             sub __send_command {
745 2     2   5 my $self = shift;
746 2         5 my $cmd = uc(shift);
747 2         5 my $deb = $self->{debug};
748              
749             # if already connected but after a fork, reconnect
750 2 50 50     31 if ($self->{sock} && ($self->{pid} || 0) != $$) {
      33        
751 0         0 $self->connect;
752             }
753              
754             my $sock = $self->{sock}
755 2   33     7 || $self->__throw_reconnect('Not connected to any server');
756              
757 2 50       33 warn "[SEND] $cmd ", Dumper([@_]) if $deb;
758              
759             ## Encode command using multi-bulk format
760 2         18 my @cmd = split /_/, $cmd;
761 2         8 my $n_elems = scalar(@_) + scalar(@cmd);
762 2         6 my $buf = "\*$n_elems\r\n";
763 2         22 for my $bin (@cmd, @_) {
764 4 50       16 utf8::downgrade($bin, 1)
765             or croak "command sent is not an octet sequence in the native encoding (Latin-1). Consider using debug mode to see the command itself.";
766 4 50       17 $buf .= defined($bin) ? '$' . length($bin) . "\r\n$bin\r\n" : "\$-1\r\n";
767             }
768              
769             # this function works differently with a SSL socket cause it's not
770             # possible to read just a few bytes from a TLS frame.
771 2         49 my $status = $self->__try_read_sock($sock);
772 2 50       7 $self->__throw_reconnect('Not connected to any server')
773             unless defined $status;
774              
775             ## Send command, take care for partial writes
776 2 50       6 warn "[SEND RAW] $buf" if $deb;
777 2         6 while ($buf) {
778 2         28 my $len = syswrite $sock, $buf, length $buf;
779 2 50       309 $self->__throw_reconnect("Could not write to Redis server: $!")
780             unless defined $len;
781 2         12 substr $buf, 0, $len, "";
782             }
783              
784 2         8 return;
785             }
786              
787             sub __read_response {
788 2     2   8 my ($self, $cmd, $collect_errors) = @_;
789              
790 2 50       8 croak("Not connected to any server") unless $self->{sock};
791              
792 2         38 local $/ = "\r\n";
793              
794             ## no debug => fast path
795 2 50       20 return $self->__read_response_r($cmd, $collect_errors) unless $self->{debug};
796              
797 0         0 my ($result, $error) = $self->__read_response_r($cmd, $collect_errors);
798 0         0 warn "[RECV] $cmd ", Dumper($result, $error);
799 0         0 return $result, $error;
800             }
801              
802             sub __read_response_r {
803 2     2   7 my ($self, $command, $collect_errors) = @_;
804              
805 2         5 my ($type, $result) = $self->__read_line;
806              
807 1 50 33     7 if ($type eq '-') {
    50          
    0          
    0          
808 0         0 return undef, $result;
809             }
810             elsif ($type eq '+' || $type eq ':') {
811 1         8 return $result, undef;
812             }
813             elsif ($type eq '$') {
814 0 0       0 return undef, undef if $result < 0;
815 0         0 return $self->__read_len($result + 2), undef;
816             }
817             elsif ($type eq '*') {
818 0 0       0 return undef, undef if $result < 0;
819              
820 0         0 my @list;
821 0         0 while ($result--) {
822 0         0 my @nested = $self->__read_response_r($command, $collect_errors);
823 0 0       0 if ($collect_errors) {
824 0         0 push @list, \@nested;
825             }
826             else {
827 0 0       0 croak "[$command] $nested[1], " if defined $nested[1];
828 0         0 push @list, $nested[0];
829             }
830             }
831 0         0 return \@list, undef;
832             }
833             else {
834 0         0 croak "unknown answer type: $type ($result), ";
835             }
836             }
837              
838             sub __read_line {
839 2     2   4 my $self = $_[0];
840 2         4 my $sock = $self->{sock};
841              
842 2         6 my $data = $self->__read_line_raw;
843 2 100       9 if (! defined $data) {
844             # In case the caller catches the exception and wants to persist on using
845             # the redis connection, let's forbid that.
846 1         35 $self->__close_sock();
847 1         1134 croak("Error while reading from Redis server: $!")
848             }
849              
850 1         3 chomp $data;
851 1 50       4 warn "[RECV RAW] '$data'" if $self->{debug};
852              
853 1         3 my $type = substr($data, 0, 1, '');
854 1         4 return ($type, $data);
855             }
856              
857             sub __read_line_raw {
858 2     2   4 my $self = $_[0];
859 2         2 my $sock = $self->{sock};
860 2         7 my $buf = \$self->{__buf};
861              
862 2 50       7 if (length $$buf) {
863 0         0 my $idx = index($$buf, "\r\n");
864 0 0       0 $idx >= 0 and return substr($$buf, 0, $idx + 2, '');
865             }
866              
867 2         4 while (1) {
868 2         15 my $bytes = sysread($sock, $$buf, BUFSIZE, length($$buf));
869 2 50 66     1027002 next if !defined $bytes && $! == EINTR;
870 2 100 66     41 return unless defined $bytes && $bytes;
871              
872             # start looking for \r\n where we stopped last time
873             # extracting one is required to handle corner case
874             # where \r\n are split and therefore read by two conseqent sysreads
875 1         4 my $idx = index($$buf, "\r\n", length($$buf) - $bytes - 1);
876 1 50       8 $idx >= 0 and return substr($$buf, 0, $idx + 2, '');
877             }
878             }
879              
880             sub __read_len {
881 0     0   0 my ($self, $len) = @_;
882 0         0 my $buf = \$self->{__buf};
883 0         0 my $buflen = length($$buf);
884              
885 0 0       0 if ($buflen < $len) {
886 0         0 my $to_read = $len - $buflen;
887 0         0 while ($to_read > 0) {
888 0         0 my $bytes = sysread($self->{sock}, $$buf, BUFSIZE, length($$buf));
889 0 0 0     0 next if !defined $bytes && $! == EINTR;
890 0 0       0 croak("Error while reading from Redis server: $!") unless defined $bytes;
891 0 0       0 croak("Redis server closed connection") unless $bytes;
892 0         0 $to_read -= $bytes;
893             }
894             }
895              
896 0         0 my $data = substr($$buf, 0, $len, '');
897 0         0 chomp $data;
898 0 0       0 warn "[RECV RAW] '$data'" if $self->{debug};
899              
900 0         0 return $data;
901             }
902              
903             sub __try_read_sock {
904 2     2   8 my ($self, $sock) = @_;
905 2         19 my $data = '';
906              
907 2         5 while (1) {
908             # WIN32 doesn't support MSG_DONTWAIT,
909             # need to swith fh to nonblockng mode manually.
910             # For Unix still use MSG_DONTWAIT because of fewer syscalls
911 2         4 my ($res, $err);
912 2 50       35 if (WIN32) {
913 0         0 __fh_nonblocking_win32($sock, 1);
914 0         0 $res = recv($sock, $data, BUFSIZE, 0);
915 0         0 $err = 0 + $!;
916 0         0 __fh_nonblocking_win32($sock, 0);
917             } else {
918 2 50       17 if ($self->{ssl}) {
919             ## use peek to see if there is any data available instead of reading
920             ## it cause it's not possible to read only a few bytes from an SSL
921             ## frame. This does not work in WIN32
922 2         33 $sock->blocking(0);
923 2         147 $res = $sock->peek($data, BUFSIZE);
924 2         267 $sock->blocking(1);
925             } else {
926 0         0 $res = recv($sock, $data, BUFSIZE, MSG_DONTWAIT);
927             }
928              
929 2         41 $err = 0 + $!;
930             }
931              
932 2 50       7 if (defined $res) {
933             ## have read some data
934 0 0       0 if (length($data)) {
935 0 0       0 $self->{__buf} .= $data unless $self->{ssl};
936 0         0 return 1;
937             }
938              
939             ## no data but also no error means EOF
940 0         0 return;
941             }
942              
943 2 50 33     33 next if $err && $err == EINTR;
944              
945             ## Keep going if nothing there, but socket is alive
946 2 50 33     27 return 0 if $err and ($err == EWOULDBLOCK or $err == EAGAIN);
      33        
947              
948             ## if we got ECONNRESET, it might be due a timeout from the other side (on freebsd)
949             ## or because an intermediate proxy shut down our connection using its internal timeout counter
950 0 0 0       return 0 if ($err && $err == ECONNRESET);
951              
952             ## result is undef but err is 0? should never happen
953 0 0         return if $err == 0;
954              
955             ## For everything else, there is Mastercard...
956 0           croak("Unexpected error condition $err/$^O, please report this as a bug");
957             }
958             }
959              
960             ## Copied from AnyEvent::Util
961             sub __fh_nonblocking_win32 {
962 0     0     ioctl $_[0], 0x8004667e, pack "L", $_[1];
963             }
964              
965             ##########################
966             # I take exception to that
967              
968             sub __throw_reconnect {
969 0     0     my ($self, $m) = @_;
970 0 0         die bless(\$m, 'Redis::X::Reconnect') if $self->{reconnect};
971 0           die $m;
972             }
973              
974              
975             1; # End of Redis.pm
976              
977             __END__