File Coverage

blib/lib/Redis.pm
Criterion Covered Total %
statement 211 474 44.5
branch 51 244 20.9
condition 31 127 24.4
subroutine 42 81 51.8
pod 16 16 100.0
total 351 942 37.2


line stmt bran cond sub pod time code
1             #
2             # This file is part of Redis
3             #
4             # This software is Copyright (c) 2015 by Pedro Melo, Damien Krotkine.
5             #
6             # This is free software, licensed under:
7             #
8             # The Artistic License 2.0 (GPL Compatible)
9             #
10             package Redis;
11             $Redis::VERSION = '1.998'; # TRIAL
12             # ABSTRACT: Perl binding for Redis database
13             # VERSION
14             # AUTHORITY
15              
16 15     15   969371 use warnings;
  15         159  
  15         523  
17 15     15   79 use strict;
  15         29  
  15         342  
18              
19 15     15   7922 use IO::Socket::INET;
  15         325949  
  15         97  
20 15     15   6510 use IO::Socket::UNIX;
  15         33  
  15         100  
21 15     15   19134 use IO::Socket::Timeout;
  15         92300  
  15         117  
22 15     15   7676 use IO::Select;
  15         24903  
  15         716  
23 15     15   112 use IO::Handle;
  15         33  
  15         533  
24 15     15   80 use Fcntl qw( O_NONBLOCK F_SETFL );
  15         30  
  15         646  
25 15     15   76 use Errno ();
  15         29  
  15         229  
26 15     15   9512 use Data::Dumper;
  15         104875  
  15         956  
27 15     15   116 use Carp;
  15         31  
  15         737  
28 15     15   3051 use Try::Tiny;
  15         12553  
  15         775  
29 15     15   96 use Scalar::Util ();
  15         29  
  15         249  
30              
31 15     15   6602 use Redis::Sentinel;
  15         40  
  15         707  
32              
33 15     15   117 use constant WIN32 => $^O =~ /mswin32/i;
  15         28  
  15         1277  
34 15   50 15   101 use constant EWOULDBLOCK => eval {Errno::EWOULDBLOCK} || -1E9;
  15         40  
  15         34  
35 15   50 15   101 use constant EAGAIN => eval {Errno::EAGAIN} || -1E9;
  15         27  
  15         323  
36 15   50 15   104 use constant EINTR => eval {Errno::EINTR} || -1E9;
  15         27  
  15         23  
37 15   50 15   100 use constant ECONNRESET => eval {Errno::ECONNRESET} || -1E9;
  15         27  
  15         34  
38 15     15   90 use constant BUFSIZE => 4096;
  15         39  
  15         18811  
39              
40             sub _maybe_enable_timeouts {
41 2     2   1117 my ($self, $socket) = @_;
42 2 50       15 $socket or return;
43             defined $self->{read_timeout} || defined $self->{write_timeout}
44 2 50 33     10 or return $socket;
45 2         71 IO::Socket::Timeout->enable_timeouts_on($socket);
46             defined $self->{read_timeout}
47 2 50       822 and $socket->read_timeout($self->{read_timeout});
48             defined $self->{write_timeout}
49 2 50       182 and $socket->write_timeout($self->{write_timeout});
50 2         10 $socket;
51             }
52              
53             sub new {
54 2     2 1 21660 my ($class, %args) = @_;
55 2         31 my $self = bless {}, $class;
56              
57 2         53 $self->{__buf} = '';
58 2   33     69 $self->{debug} = $args{debug} || $ENV{REDIS_DEBUG};
59              
60             ## Deal with REDIS_SERVER ENV
61 2 0 33     72 if ($ENV{REDIS_SERVER} && ! defined $args{sock} && ! defined $args{server} && ! defined $args{sentinels}) {
      33        
      0        
62 0 0       0 if ($ENV{REDIS_SERVER} =~ m!^/!) {
    0          
    0          
63 0         0 $args{sock} = $ENV{REDIS_SERVER};
64             }
65             elsif ($ENV{REDIS_SERVER} =~ m!^unix:(.+)!) {
66 0         0 $args{sock} = $1;
67             }
68             elsif ($ENV{REDIS_SERVER} =~ m!^(?:tcp:)?(.+)!) {
69 0         0 $args{server} = $1;
70             }
71             }
72              
73             defined $args{$_}
74 2   66     39 and $self->{$_} = $args{$_} for
75             qw(password on_connect name no_auto_connect_on_new cnx_timeout
76             write_timeout read_timeout sentinels_cnx_timeout sentinels_write_timeout
77             sentinels_read_timeout no_sentinels_list_update);
78              
79 2   50     31 $self->{reconnect} = $args{reconnect} || 0;
80 2   50     33 $self->{conservative_reconnect} = $args{conservative_reconnect} || 0;
81 2   50     19 $self->{every} = $args{every} || 1000;
82              
83 2 50       15 if (defined $args{sock}) {
    50          
84 0         0 $self->{server} = $args{sock};
85             $self->{builder} = sub {
86 0     0   0 my ($self) = @_;
87             $self->_maybe_enable_timeouts(
88             IO::Socket::UNIX->new(
89             Peer => $self->{server},
90 0 0       0 ( $self->{cnx_timeout} ? ( Timeout => $self->{cnx_timeout} ): () ),
91             )
92             );
93 0         0 };
94             } elsif ($args{sentinels}) {
95 0         0 $self->{sentinels} = $args{sentinels};
96              
97 0 0       0 ref $self->{sentinels} eq 'ARRAY'
98             or croak("'sentinels' param must be an ArrayRef");
99              
100             defined($self->{service} = $args{service})
101 0 0       0 or croak("Need 'service' name when using 'sentinels'!");
102              
103             $self->{builder} = sub {
104 0     0   0 my ($self) = @_;
105             # try to connect to a sentinel
106 0         0 my $status;
107 0         0 foreach my $sentinel_address (@{$self->{sentinels}}) {
  0         0  
108 0 0       0 my $sentinel = eval {
109             Redis::Sentinel->new(
110             server => $sentinel_address,
111             cnx_timeout => ( defined $self->{sentinels_cnx_timeout}
112             ? $self->{sentinels_cnx_timeout} : 0.1),
113             read_timeout => ( defined $self->{sentinels_read_timeout}
114             ? $self->{sentinels_read_timeout} : 1 ),
115             write_timeout => ( defined $self->{sentinels_write_timeout}
116 0 0       0 ? $self->{sentinels_write_timeout} : 1 ),
    0          
    0          
117             )
118             } or next;
119 0         0 my $server_address = $sentinel->get_service_address($self->{service});
120 0 0 0     0 defined $server_address
121             or $status ||= "Sentinels don't know this service",
122             next;
123 0 0       0 $server_address eq 'IDONTKNOW'
124             and $status = "service is configured in one Sentinel, but was never reached",
125             next;
126              
127             # we found the service, set the server
128 0         0 $self->{server} = $server_address;
129              
130 0 0       0 if (! $self->{no_sentinels_list_update} ) {
131             # move the elected sentinel at the front of the list and add
132             # additional sentinels
133 0         0 my $idx = 2;
134 0         0 my %h = ( ( map { $_ => $idx++ } @{$self->{sentinels}}),
  0         0  
  0         0  
135             $sentinel_address => 1,
136             );
137             $self->{sentinels} = [
138 0         0 ( sort { $h{$a} <=> $h{$b} } keys %h ), # sorted existing sentinels,
139 0         0 grep { ! $h{$_}; } # list of unknown
140 0         0 map { +{ @$_ }->{name}; } # names of
141             $sentinel->sentinel( # sentinels
142             sentinels => $self->{service} # for this service
143             )
144 0         0 ];
145             }
146            
147             return $self->_maybe_enable_timeouts(
148             IO::Socket::INET->new(
149             PeerAddr => $server_address,
150             Proto => 'tcp',
151 0 0       0 ( $self->{cnx_timeout} ? ( Timeout => $self->{cnx_timeout} ) : () ),
152             )
153             );
154             }
155 0   0     0 croak($status || "failed to connect to any of the sentinels");
156 0         0 };
157             } else {
158 2 50       15 $self->{server} = defined $args{server} ? $args{server} : '127.0.0.1:6379';
159             $self->{builder} = sub {
160 2     2   4 my ($self) = @_;
161             $self->_maybe_enable_timeouts(
162             IO::Socket::INET->new(
163             PeerAddr => $self->{server},
164             Proto => 'tcp',
165 2 50       79 ( $self->{cnx_timeout} ? ( Timeout => $self->{cnx_timeout} ) : () ),
166             )
167             );
168 2         83 };
169             }
170              
171 2         7 $self->{is_subscriber} = 0;
172 2         5 $self->{subscribers} = {};
173              
174 2 50       74 $self->connect unless $args{no_auto_connect_on_new};
175              
176 2         9 return $self;
177             }
178              
179 0     0 1 0 sub is_subscriber { $_[0]{is_subscriber} }
180              
181             sub select {
182 0     0 1 0 my $self = shift;
183 0         0 my $database = shift;
184              
185 0 0       0 croak( "Cannot select an undefined redis database" )
186             unless defined $database;
187             # don't want to send multiple select() back and forth
188 0 0 0     0 if (!defined $self->{current_database} or $self->{current_database} ne $database) {
189 0         0 my $ret = $self->__std_cmd('select', $database, @_);
190 0         0 $self->{current_database} = $database;
191 0         0 return $ret;
192             };
193 0         0 return "OK"; # emulate redis response as of 3.0.6 just in case anybody cares
194             }
195              
196             ### we don't want DESTROY to fallback into AUTOLOAD
197       0     sub DESTROY { }
198              
199              
200             ### Deal with common, general case, Redis commands
201             our $AUTOLOAD;
202              
203             sub AUTOLOAD {
204 1     1   1953 my $command = $AUTOLOAD;
205 1         32 $command =~ s/.*://;
206              
207 1     2   15 my $method = sub { shift->__std_cmd($command, @_) };
  2         1968  
208              
209             # Save this method for future calls
210 15     15   167 no strict 'refs';
  15         50  
  15         72740  
211 1         38 *$AUTOLOAD = $method;
212              
213 1         6 goto $method;
214             }
215              
216             sub __std_cmd {
217 2     2   17 my $self = shift;
218 2         4 my $command = shift;
219              
220 2         8 $self->__is_valid_command($command);
221              
222 2 50 33     35 my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
223              
224             # If this is an EXEC command, in pipelined mode, and one of the commands
225             # executed in the transaction yields an error, we must collect all errors
226             # from that command, rather than throwing an exception immediately.
227 2         13 my $uc_command = uc($command);
228 2   33     16 my $collect_errors = $cb && $uc_command eq 'EXEC';
229              
230 2 50 33     37 if ($uc_command eq 'MULTI') {
    50          
    50          
    50          
231 0         0 $self->{__inside_transaction} = 1;
232             } elsif ($uc_command eq 'EXEC' || $uc_command eq 'DISCARD') {
233 0         0 delete $self->{__inside_transaction};
234 0         0 delete $self->{__inside_watch};
235             } elsif ($uc_command eq 'WATCH') {
236 0         0 $self->{__inside_watch} = 1;
237             } elsif ($uc_command eq 'UNWATCH') {
238 0         0 delete $self->{__inside_watch};
239             }
240              
241             ## Fast path, no reconnect;
242             $self->{reconnect}
243 2 50       16 or return $self->__run_cmd($command, $collect_errors, undef, $cb, @_);
244              
245 0         0 my @cmd_args = @_;
246             $self->__with_reconnect(
247             sub {
248 0     0   0 $self->__run_cmd($command, $collect_errors, undef, $cb, @cmd_args);
249             }
250 0         0 );
251             }
252              
253             sub __with_reconnect {
254 0     0   0 my ($self, $cb) = @_;
255              
256             ## Fast path, no reconnect
257             $self->{reconnect}
258 0 0       0 or return $cb->();
259              
260             return &try(
261             $cb,
262             catch {
263 0 0   0   0 ref($_) eq 'Redis::X::Reconnect'
264             or die $_;
265              
266             $self->{__inside_transaction} || $self->{__inside_watch}
267 0 0 0     0 and croak("reconnect disabled inside transaction or watch");
268              
269 0 0       0 scalar @{$self->{queue} || []} && $self->{conservative_reconnect}
270 0 0 0     0 and croak("reconnect disabled while responses are pending and conservative reconnect mode enabled");
271              
272 0         0 $self->connect;
273 0         0 $cb->();
274             }
275 0         0 );
276             }
277              
278             sub __run_cmd {
279 2     2   14 my ($self, $command, $collect_errors, $custom_decode, $cb, @args) = @_;
280              
281 2         4 my $ret;
282             my $wrapper = $cb && $custom_decode
283             ? sub {
284 0     0   0 my ($reply, $error) = @_;
285 0         0 $cb->(scalar $custom_decode->($reply), $error);
286             }
287             : $cb || sub {
288 1     1   3 my ($reply, $error) = @_;
289 1         2 croak "[$command] $error, " if defined $error;
290 1         3 $ret = $reply;
291 2 50 33     54 };
      50        
292              
293 2         20 $self->__send_command($command, @args);
294 2         4 push @{ $self->{queue} }, [$command, $wrapper, $collect_errors];
  2         8  
295              
296 2 50       7 return 1 if $cb;
297              
298 2         33 $self->wait_all_responses;
299             return
300 1 50 33     12 $custom_decode ? $custom_decode->($ret, !wantarray)
    50          
301             : wantarray && ref $ret eq 'ARRAY' ? @$ret
302             : $ret;
303             }
304              
305             sub wait_all_responses {
306 2     2 1 7 my ($self) = @_;
307              
308 2         6 my $queue = $self->{queue};
309 2         24 $self->wait_one_response while @$queue;
310              
311 1         2 return;
312             }
313              
314             sub wait_one_response {
315 2     2 1 6 my ($self) = @_;
316              
317 2         4 my $handler = shift @{ $self->{queue} };
  2         5  
318 2 50       6 return unless $handler;
319              
320 2         13 my ($command, $cb, $collect_errors) = @$handler;
321 2         13 $cb->($self->__read_response($command, $collect_errors));
322              
323 1         4 return;
324             }
325              
326              
327             ### Commands with extra logic
328             sub quit {
329 0     0 1 0 my ($self) = @_;
330 0 0       0 return unless $self->{sock};
331              
332 0 0 0     0 croak "[quit] only works in synchronous mode, "
333             if @_ && ref $_[-1] eq 'CODE';
334              
335             try {
336 0     0   0 $self->wait_all_responses;
337 0         0 $self->__send_command('QUIT');
338 0         0 };
339              
340 0         0 $self->__close_sock();
341              
342 0         0 return 1;
343             }
344              
345             sub shutdown {
346 0     0 1 0 my ($self) = @_;
347 0         0 $self->__is_valid_command('SHUTDOWN');
348              
349 0 0 0     0 croak "[shutdown] only works in synchronous mode, "
350             if @_ && ref $_[-1] eq 'CODE';
351              
352 0 0       0 return unless $self->{sock};
353              
354 0         0 $self->wait_all_responses;
355 0         0 $self->__send_command('SHUTDOWN');
356 0 0       0 $self->__close_sock() || croak("Can't close socket: $!");
357              
358 0         0 return 1;
359             }
360              
361             sub ping {
362 0     0 1 0 my $self = shift;
363 0         0 $self->__is_valid_command('PING');
364              
365 0 0 0     0 croak "[ping] only works in synchronous mode, "
366             if @_ && ref $_[-1] eq 'CODE';
367              
368 0 0       0 return unless defined $self->{sock};
369              
370 0         0 $self->wait_all_responses;
371             return scalar try {
372 0     0   0 $self->__std_cmd('PING');
373             }
374             catch {
375 0     0   0 $self->__close_sock();
376 0         0 return;
377 0         0 };
378             }
379              
380             sub info {
381 0     0 1 0 my $self = shift;
382 0         0 $self->__is_valid_command('INFO');
383              
384             my $custom_decode = sub {
385 0     0   0 my ($reply) = @_;
386 0 0 0     0 return $reply if !defined $reply || ref $reply;
387 0         0 return { map { split(/:/, $_, 2) } grep {/^[^#]/} split(/\r\n/, $reply) };
  0         0  
  0         0  
388 0         0 };
389              
390 0 0 0     0 my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
391              
392             ## Fast path, no reconnect
393             return $self->__run_cmd('INFO', 0, $custom_decode, $cb, @_)
394 0 0       0 unless $self->{reconnect};
395              
396 0         0 my @cmd_args = @_;
397             $self->__with_reconnect(
398             sub {
399 0     0   0 $self->__run_cmd('INFO', 0, $custom_decode, $cb, @cmd_args);
400             }
401 0         0 );
402             }
403              
404             sub keys {
405 0     0 1 0 my $self = shift;
406 0         0 $self->__is_valid_command('KEYS');
407              
408             my $custom_decode = sub {
409 0     0   0 my ($reply, $synchronous_scalar) = @_;
410              
411             ## Support redis <= 1.2.6
412 0 0 0     0 $reply = [split(/\s/, $reply)] if defined $reply && !ref $reply;
413              
414 0 0 0     0 return ref $reply && ($synchronous_scalar || wantarray) ? @$reply : $reply;
415 0         0 };
416              
417 0 0 0     0 my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef;
418              
419             ## Fast path, no reconnect
420             return $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @_)
421 0 0       0 unless $self->{reconnect};
422              
423 0         0 my @cmd_args = @_;
424             $self->__with_reconnect(
425             sub {
426 0     0   0 $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @cmd_args);
427             }
428 0         0 );
429             }
430              
431              
432             ### PubSub
433             sub wait_for_messages {
434 0     0 1 0 my ($self, $timeout) = @_;
435              
436 0         0 my $s = IO::Select->new;
437              
438 0         0 my $count = 0;
439              
440              
441 0         0 my $e;
442              
443             try {
444             $self->__with_reconnect( sub {
445              
446             # the socket can be changed due to reconnection, so get it each time
447 0         0 my $sock = $self->{sock};
448 0         0 $s->remove($s->handles);
449 0         0 $s->add($sock);
450              
451 0         0 while ($s->can_read($timeout)) {
452 0         0 my $has_stuff = $self->__try_read_sock($sock);
453             # If the socket is ready to read but there is nothing to read, ( so
454             # it's an EOF ), try to reconnect.
455 0 0       0 defined $has_stuff
456             or $self->__throw_reconnect('EOF from server');
457              
458             do {
459 0         0 my ($reply, $error) = $self->__read_response('WAIT_FOR_MESSAGES');
460 0 0       0 croak "[WAIT_FOR_MESSAGES] $error, " if defined $error;
461 0         0 $self->__process_pubsub_msg($reply);
462 0         0 $count++;
463              
464             # if __try_read_sock() return 0 (no data)
465             # or undef ( socket became EOF), back to select until timeout
466 0   0     0 } while ($self->{__buf} || $self->__try_read_sock($sock));
467             }
468            
469 0     0   0 });
470              
471             } catch {
472 0     0   0 $e = $_;
473 0         0 };
474              
475             # if We had an error and it was not an EOF, die
476 0 0 0     0 defined $e && $e ne 'EOF from server'
477             and die $e;
478              
479 0         0 return $count;
480             }
481              
482             sub __subscription_cmd {
483 0     0   0 my $self = shift;
484 0         0 my $pr = shift;
485 0         0 my $unsub = shift;
486 0         0 my $command = shift;
487 0         0 my $cb = pop;
488              
489 0 0       0 croak("Missing required callback in call to $command(), ")
490             unless ref($cb) eq 'CODE';
491              
492 0         0 $self->wait_all_responses;
493              
494 0         0 my @subs = @_;
495             $self->__with_reconnect(
496             sub {
497             $self->__throw_reconnect('Not connected to any server')
498 0 0   0   0 unless $self->{sock};
499              
500 0 0       0 @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
501             if $unsub;
502 0 0       0 return unless @subs;
503              
504 0         0 $self->__send_command($command, @subs);
505              
506 0         0 my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
  0         0  
507 0         0 return $self->__process_subscription_changes($command, \%cbs);
508             }
509 0         0 );
510             }
511              
512 0     0 1 0 sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) }
513 0     0 1 0 sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) }
514 0     0 1 0 sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) }
515 0     0 1 0 sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
516              
517             sub __process_unsubscribe_requests {
518 0     0   0 my ($self, $cb, $pr, @unsubs) = @_;
519 0         0 my $subs = $self->{subscribers};
520              
521 0         0 my @subs_to_unsubscribe;
522 0         0 for my $sub (@unsubs) {
523 0         0 my $key = "${pr}message:$sub";
524 0         0 my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{ $subs->{$key} }];
  0         0  
  0         0  
525 0 0       0 next if @$cbs;
526              
527 0         0 delete $subs->{$key};
528 0         0 push @subs_to_unsubscribe, $sub;
529             }
530              
531 0         0 return @subs_to_unsubscribe;
532             }
533              
534             sub __process_subscription_changes {
535 0     0   0 my ($self, $cmd, $expected) = @_;
536 0         0 my $subs = $self->{subscribers};
537              
538 0         0 while (%$expected) {
539 0         0 my ($m, $error) = $self->__read_response($cmd);
540 0 0       0 croak "[$cmd] $error, " if defined $error;
541              
542             ## Deal with pending PUBLISH'ed messages
543 0 0       0 if ($m->[0] =~ /^p?message$/) {
544 0         0 $self->__process_pubsub_msg($m);
545 0         0 next;
546             }
547              
548 0         0 my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
549 0         0 $key .= "message:$m->[1]";
550 0         0 my $cb = delete $expected->{$key};
551              
552 0 0       0 push @{ $subs->{$key} }, $cb unless $unsub;
  0         0  
553              
554 0         0 $self->{is_subscriber} = $m->[2];
555             }
556             }
557              
558             sub __process_pubsub_msg {
559 0     0   0 my ($self, $m) = @_;
560 0         0 my $subs = $self->{subscribers};
561              
562 0         0 my $sub = $m->[1];
563 0         0 my $cbid = "$m->[0]:$sub";
564 0         0 my $data = pop @$m;
565 0 0       0 my $topic = defined $m->[2] ? $m->[2] : $sub;
566              
567 0 0       0 if (!defined $subs->{$cbid}) {
568 0         0 warn "Message for topic '$topic' ($cbid) without expected callback, ";
569 0         0 return;
570             }
571              
572 0         0 $_->($data, $topic, $sub) for @{ $subs->{$cbid} };
  0         0  
573              
574 0         0 return 1;
575              
576             }
577              
578              
579             ### Mode validation
580             sub __is_valid_command {
581 2     2   13 my ($self, $cmd) = @_;
582              
583             croak("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
584 2 50       13 if $self->{is_subscriber};
585             }
586              
587              
588             ### Socket operations
589             sub connect {
590 2     2 1 8 my ($self) = @_;
591 2         20 delete $self->{sock};
592 2         6 delete $self->{__inside_watch};
593 2         17 delete $self->{__inside_transaction};
594              
595             # Suppose we have at least one command response pending, but we're about
596             # to reconnect. The new connection will never get a response to any of
597             # the pending commands, so delete all those pending responses now.
598 2         24 $self->{queue} = [];
599 2         35 $self->{pid} = $$;
600              
601             ## Fast path, no reconnect
602 2 50       30 return $self->__build_sock() unless $self->{reconnect};
603              
604             ## Use precise timers on reconnections
605 0         0 require Time::HiRes;
606 0         0 my $t0 = [Time::HiRes::gettimeofday()];
607              
608             ## Reconnect...
609 0         0 while (1) {
610 0         0 eval { $self->__build_sock };
  0         0  
611              
612 0 0       0 last unless $@; ## Connected!
613 0 0       0 die if Time::HiRes::tv_interval($t0) > $self->{reconnect}; ## Timeout
614 0         0 Time::HiRes::usleep($self->{every}); ## Retry in...
615             }
616              
617 0         0 return;
618             }
619              
620             sub __build_sock {
621 2     2   7 my ($self) = @_;
622              
623 2   33     8 $self->{sock} = $self->{builder}->($self)
624             || croak("Could not connect to Redis server at $self->{server}: $!");
625              
626 2         5 $self->{__buf} = '';
627              
628 2 50       24 if (defined $self->{password}) {
629 0     0   0 try { $self->auth($self->{password}) }
630             catch {
631 0     0   0 my $error = $_;
632 0         0 $self->{reconnect} = 0;
633 0         0 croak('Redis server authentication error: ' . $error);
634 0         0 };
635             }
636              
637 2         25 $self->__on_connection;
638              
639 2         5 return;
640             }
641              
642             sub __close_sock {
643 1     1   20 my ($self) = @_;
644 1         9 $self->{__buf} = '';
645 1         4 delete $self->{__inside_watch};
646 1         3 delete $self->{__inside_transaction};
647 1 50       5 defined $self->{sock} or return 1;
648 1         107 return close(delete $self->{sock});
649             }
650              
651             sub __on_connection {
652              
653 2     2   6 my ($self) = @_;
654              
655             # If we are in PubSub mode we shouldn't perform any command besides
656             # (p)(un)subscribe
657 2 50       12 if (! $self->{is_subscriber}) {
658             defined $self->{name}
659             and try {
660 0     0   0 my $n = $self->{name};
661 0 0       0 $n = $n->($self) if ref($n) eq 'CODE';
662 0 0       0 $self->client_setname($n) if defined $n;
663 2 50       16 };
664              
665             # don't use select() function as it's caching database name,
666             # rather call select directly
667             defined $self->{current_database}
668 2 50       18 and $self->__std_cmd('select', $self->{current_database});
669             }
670              
671 2         13 foreach my $topic (CORE::keys(%{$self->{subscribers}})) {
  2         24  
672 0 0       0 if ($topic =~ /(p?message):(.*)$/ ) {
673 0         0 my ($key, $channel) = ($1, $2);
674 0 0       0 if ($key eq 'message') {
675 0         0 $self->__send_command('subscribe', $channel);
676 0         0 my (undef, $error) = $self->__read_response('subscribe');
677 0 0       0 defined $error
678             and croak "[subscribe] $error";
679             } else {
680 0         0 $self->__send_command('psubscribe', $channel);
681 0         0 my (undef, $error) = $self->__read_response('psubscribe');
682 0 0       0 defined $error
683             and croak "[psubscribe] $error";
684             }
685             }
686             }
687              
688             defined $self->{on_connect}
689 2 50       11 and $self->{on_connect}->($self);
690              
691             }
692              
693              
694             sub __send_command {
695 2     2   4 my $self = shift;
696 2         4 my $cmd = uc(shift);
697 2         5 my $deb = $self->{debug};
698              
699             # if already connected but after a fork, reconnect
700 2 50 50     32 if ($self->{sock} && ($self->{pid} || 0) != $$) {
      33        
701 0         0 $self->connect;
702             }
703              
704             my $sock = $self->{sock}
705 2   33     6 || $self->__throw_reconnect('Not connected to any server');
706              
707 2 50       5 warn "[SEND] $cmd ", Dumper([@_]) if $deb;
708              
709             ## Encode command using multi-bulk format
710 2         8 my @cmd = split /_/, $cmd;
711 2         6 my $n_elems = scalar(@_) + scalar(@cmd);
712 2         6 my $buf = "\*$n_elems\r\n";
713 2         21 for my $bin (@cmd, @_) {
714 4 50       16 utf8::downgrade($bin, 1)
715             or croak "command sent is not an octet sequence in the native encoding (Latin-1). Consider using debug mode to see the command itself.";
716 4 50       17 $buf .= defined($bin) ? '$' . length($bin) . "\r\n$bin\r\n" : "\$-1\r\n";
717             }
718              
719             ## Check to see if socket was closed: reconnect on EOF
720 2         89 my $status = $self->__try_read_sock($sock);
721 2 50       7 $self->__throw_reconnect('Not connected to any server')
722             unless defined $status;
723              
724             ## Send command, take care for partial writes
725 2 50       4 warn "[SEND RAW] $buf" if $deb;
726 2         5 while ($buf) {
727 2         131 my $len = syswrite $sock, $buf, length $buf;
728 2 50       10 $self->__throw_reconnect("Could not write to Redis server: $!")
729             unless defined $len;
730 2         9 substr $buf, 0, $len, "";
731             }
732              
733 2         6 return;
734             }
735              
736             sub __read_response {
737 2     2   13 my ($self, $cmd, $collect_errors) = @_;
738              
739 2 50       8 croak("Not connected to any server") unless $self->{sock};
740              
741 2         31 local $/ = "\r\n";
742              
743             ## no debug => fast path
744 2 50       14 return $self->__read_response_r($cmd, $collect_errors) unless $self->{debug};
745              
746 0         0 my ($result, $error) = $self->__read_response_r($cmd, $collect_errors);
747 0         0 warn "[RECV] $cmd ", Dumper($result, $error);
748 0         0 return $result, $error;
749             }
750              
751             sub __read_response_r {
752 2     2   14 my ($self, $command, $collect_errors) = @_;
753              
754 2         27 my ($type, $result) = $self->__read_line;
755              
756 1 50 33     9 if ($type eq '-') {
    50          
    0          
    0          
757 0         0 return undef, $result;
758             }
759             elsif ($type eq '+' || $type eq ':') {
760 1         8 return $result, undef;
761             }
762             elsif ($type eq '$') {
763 0 0       0 return undef, undef if $result < 0;
764 0         0 return $self->__read_len($result + 2), undef;
765             }
766             elsif ($type eq '*') {
767 0 0       0 return undef, undef if $result < 0;
768              
769 0         0 my @list;
770 0         0 while ($result--) {
771 0         0 my @nested = $self->__read_response_r($command, $collect_errors);
772 0 0       0 if ($collect_errors) {
773 0         0 push @list, \@nested;
774             }
775             else {
776 0 0       0 croak "[$command] $nested[1], " if defined $nested[1];
777 0         0 push @list, $nested[0];
778             }
779             }
780 0         0 return \@list, undef;
781             }
782             else {
783 0         0 croak "unknown answer type: $type ($result), ";
784             }
785             }
786              
787             sub __read_line {
788 2     2   5 my $self = $_[0];
789 2         4 my $sock = $self->{sock};
790              
791 2         13 my $data = $self->__read_line_raw;
792 2 100       13 if (! defined $data) {
793             # In case the caller catches the exception and wants to persist on using
794             # the redis connection, let's forbid that.
795 1         40 $self->__close_sock();
796 1         526 croak("Error while reading from Redis server: $!")
797             }
798              
799 1         3 chomp $data;
800 1 50       4 warn "[RECV RAW] '$data'" if $self->{debug};
801              
802 1         12 my $type = substr($data, 0, 1, '');
803 1         5 return ($type, $data);
804             }
805              
806             sub __read_line_raw {
807 2     2   5 my $self = $_[0];
808 2         3 my $sock = $self->{sock};
809 2         7 my $buf = \$self->{__buf};
810              
811 2 50       7 if (length $$buf) {
812 0         0 my $idx = index($$buf, "\r\n");
813 0 0       0 $idx >= 0 and return substr($$buf, 0, $idx + 2, '');
814             }
815              
816 2         4 while (1) {
817 2         999477 my $bytes = sysread($sock, $$buf, BUFSIZE, length($$buf));
818 2 50 66     96 next if !defined $bytes && $! == EINTR;
819 2 100 66     42 return unless defined $bytes && $bytes;
820              
821             # start looking for \r\n where we stopped last time
822             # extracting one is required to handle corner case
823             # where \r\n are split and therefore read by two conseqent sysreads
824 1         5 my $idx = index($$buf, "\r\n", length($$buf) - $bytes - 1);
825 1 50       9 $idx >= 0 and return substr($$buf, 0, $idx + 2, '');
826             }
827             }
828              
829             sub __read_len {
830 0     0   0 my ($self, $len) = @_;
831 0         0 my $buf = \$self->{__buf};
832 0         0 my $buflen = length($$buf);
833              
834 0 0       0 if ($buflen < $len) {
835 0         0 my $to_read = $len - $buflen;
836 0         0 while ($to_read > 0) {
837 0         0 my $bytes = sysread($self->{sock}, $$buf, BUFSIZE, length($$buf));
838 0 0 0     0 next if !defined $bytes && $! == EINTR;
839 0 0       0 croak("Error while reading from Redis server: $!") unless defined $bytes;
840 0 0       0 croak("Redis server closed connection") unless $bytes;
841 0         0 $to_read -= $bytes;
842             }
843             }
844              
845 0         0 my $data = substr($$buf, 0, $len, '');
846 0         0 chomp $data;
847 0 0       0 warn "[RECV RAW] '$data'" if $self->{debug};
848              
849 0         0 return $data;
850             }
851              
852             sub __try_read_sock {
853 2     2   9 my ($self, $sock) = @_;
854 2         13 my $data = '';
855              
856 2         6 while (1) {
857             # WIN32 doesn't support MSG_DONTWAIT,
858             # need to swith fh to nonblockng mode manually.
859             # For Unix still use MSG_DONTWAIT because of fewer syscalls
860 2         45 my ($res, $err);
861 2 50       53 if (WIN32) {
862 0         0 __fh_nonblocking_win32($sock, 1);
863 0         0 $res = recv($sock, $data, BUFSIZE, 0);
864 0         0 $err = 0 + $!;
865 0         0 __fh_nonblocking_win32($sock, 0);
866             } else {
867 2         42 $res = recv($sock, $data, BUFSIZE, MSG_DONTWAIT);
868 2         19 $err = 0 + $!;
869             }
870              
871 2 50       8 if (defined $res) {
872             ## have read some data
873 0 0       0 if (length($data)) {
874 0         0 $self->{__buf} .= $data;
875 0         0 return 1;
876             }
877              
878             ## no data but also no error means EOF
879 0         0 return;
880             }
881              
882 2 50 33     26 next if $err && $err == EINTR;
883              
884             ## Keep going if nothing there, but socket is alive
885 2 50 33     12 return 0 if $err and ($err == EWOULDBLOCK or $err == EAGAIN);
      33        
886              
887             ## if we got ECONNRESET, it might be due a timeout from the other side (on freebsd)
888             ## or because an intermediate proxy shut down our connection using its internal timeout counter
889 0 0 0       return 0 if ($err && $err == ECONNRESET);
890              
891             ## result is undef but err is 0? should never happen
892 0 0         return if $err == 0;
893              
894             ## For everything else, there is Mastercard...
895 0           croak("Unexpected error condition $err/$^O, please report this as a bug");
896             }
897             }
898              
899             ## Copied from AnyEvent::Util
900             sub __fh_nonblocking_win32 {
901 0     0     ioctl $_[0], 0x8004667e, pack "L", $_[1];
902             }
903              
904             ##########################
905             # I take exception to that
906              
907             sub __throw_reconnect {
908 0     0     my ($self, $m) = @_;
909 0 0         die bless(\$m, 'Redis::X::Reconnect') if $self->{reconnect};
910 0           die $m;
911             }
912              
913              
914             1; # End of Redis.pm
915              
916             __END__