File Coverage

blib/lib/RedisDB.pm
Criterion Covered Total %
statement 226 525 43.0
branch 90 256 35.1
condition 49 120 40.8
subroutine 35 75 46.6
pod 31 32 96.8
total 431 1008 42.7


line stmt bran cond sub pod time code
1             package RedisDB;
2              
3 17     17   943658 use strict;
  17         108  
  17         608  
4 17     17   93 use warnings;
  17         34  
  17         808  
5              
6             our $VERSION = "2.57";
7             $VERSION = eval $VERSION;
8              
9 17     17   7073 use RedisDB::Error;
  17         38  
  17         501  
10 17     17   8186 use RedisDB::Parser;
  17         70224  
  17         589  
11 17     17   10979 use IO::Socket::IP;
  17         571808  
  17         95  
12 17     17   8145 use IO::Socket::UNIX;
  17         46  
  17         189  
13 17     17   13373 use Socket qw(MSG_DONTWAIT MSG_NOSIGNAL SO_RCVTIMEO SO_SNDTIMEO);
  17         47  
  17         1154  
14 17     17   109 use POSIX qw(:errno_h);
  17         33  
  17         149  
15 17     17   6681 use Config;
  17         35  
  17         745  
16 17     17   102 use Carp;
  17         41  
  17         984  
17 17     17   104 use Try::Tiny;
  17         40  
  17         898  
18 17     17   10664 use Encode qw();
  17         170548  
  17         437  
19 17     17   9818 use URI;
  17         81366  
  17         566  
20 17     17   7881 use URI::redis;
  17         152516  
  17         67705  
21              
22             =head1 NAME
23              
24             RedisDB - Perl extension to access redis database
25              
26             =head1 SYNOPSIS
27              
28             use RedisDB;
29              
30             my $redis = RedisDB->new(host => 'localhost', port => 6379);
31             $redis->set($key, $value);
32             my $value = $redis->get($key);
33              
34             =head1 DESCRIPTION
35              
36             This module provides interface to access redis key-value store, it
37             transparently handles disconnects and forks, supports transactions,
38             pipelining, and subscription mode.
39              
40             =head1 METHODS
41              
42             =cut
43              
44             =head2 $class->new(%options)
45              
46             Creates a new RedisDB object. The following options are accepted:
47              
48             =over 4
49              
50             =item host
51              
52             domain name of the host running redis server. Default: "localhost"
53              
54             =item port
55              
56             port to connect. Default: 6379
57              
58             =item path
59              
60             you can connect to redis using UNIX socket. In this case instead of
61             L and L you should specify I.
62              
63             =item password
64              
65             Password, if redis server requires authentication. Alternatively you can use
66             I method after creating the object.
67              
68             =item database
69              
70             DB number to use. Specified database will be selected immediately after
71             connecting to the server. Database changes when you sending I
72             to the server. You can get current database using I method.
73             Default value is 0.
74              
75             =item url
76              
77             A Redis URL as described in L.
78              
79             You cannot use C together with any of C, C, C,
80             C, C.
81              
82             =item raise_error
83              
84             By default if redis-server returned error reply, or there was a connection
85             error I method throws an exception of L type, if you
86             set this parameter to false it will return an error object instead. Note, that
87             if you set this to false you should always check if the result you've got from
88             RedisDB method is a L object.
89              
90             =item timeout
91              
92             IO timeout. With this option set, if I/O operation has taken more than
93             specified number of seconds, module will croak or return
94             L error object depending on L setting.
95             Note, that some OSes do not support SO_RCVTIMEO, and SO_SNDTIMEO socket
96             options, in this case timeout will not work.
97              
98             =item utf8
99              
100             Assume that all data on the server encoded in UTF-8. As result all strings will
101             be converted to UTF-8 before sending to server, and all results will be decoded
102             from UTF-8. See L.
103              
104             =item connection_name
105              
106             After establishing a connection set its name to the specified using "CLIENT
107             SETNAME" command.
108              
109             =item lazy
110              
111             by default I establishes a connection to the server. If this parameter is
112             set, then connection will be established only when you will send first command
113             to the server.
114              
115             =item reconnect_attempts
116              
117             this parameter allows you to specify how many attempts to (re)connect to the
118             server should be made before returning an error. Default value is 1, set to -1
119             if module should try to reconnect indefinitely.
120              
121             =item reconnect_delay_max
122              
123             module waits some time before every new attempt to connect. Delay increases
124             each time. This parameter allows you to specify maximum delay between attempts
125             to reconnect. Default value is 10.
126              
127             =item on_connect_error
128              
129             if module failed to establish connection with the server it will invoke this
130             callback. First argument to the callback is a reference to the RedisDB object,
131             and second is the error description. You must not invoke any methods on the
132             object inside the callback, but you can change I and I, or I
133             attributes of the I object to point to another server. After callback
134             returned, module tries to establish connection again using new parameters. To
135             prevent further connection attempts callback should throw an exception, which
136             is done by default callback. This may be useful to switch to backup server if
137             primary went down. RedisDB distribution includes an example of using this
138             callback in eg/server_failover.pl.
139              
140             =back
141              
142             =cut
143              
144             sub new {
145 36     36 1 4044902 my $class = shift;
146 36 50       1621 my $self = ref $_[0] ? $_[0] : {@_};
147 36         256 bless $self, $class;
148 36 100 100     623 if ( $self->{path} and ( $self->{host} or $self->{port} ) ) {
      100        
149 2         157 croak "You can't specify \"path\" together with \"host\" and \"port\"";
150             }
151 34 100       397 if ( $self->{url} ) {
152 8 100 100     49 if ( $self->{host} or $self->{port} or $self->{path} ) {
      100        
153 3         39 croak "You can't specify \"url\" together with \"host\", \"port\" and \"path\"";
154             }
155              
156 5         16 $self->_parse_url( $self->{url} );
157             }
158 31   100     588 $self->{port} ||= 6379;
159 31   100     214 $self->{host} ||= 'localhost';
160 31 100       257 $self->{raise_error} = 1 unless exists $self->{raise_error};
161 31         296 $self->{_replies} = [];
162 31         251 $self->{_to_be_fetched} = 0;
163 31   100     625 $self->{database} ||= 0;
164 31   100     335 $self->{reconnect_attempts} ||= 1;
165 31   100     208 $self->{reconnect_delay_max} ||= 10;
166 31   100     428 $self->{on_connect_error} ||= \&_on_connect_error;
167 31         312 $self->_init_parser;
168 31 100       2591 $self->_connect unless $self->{lazy};
169 25         130 return $self;
170             }
171              
172             sub _parse_url {
173 5     5   13 my ($self, $url) = @_;
174              
175 5         25 my $uri = URI->new($url);
176              
177 5 50       1680 if ( $uri->scheme !~ /^redis/ ) {
178 0         0 die "Unknown URL scheme '" . $uri->scheme . "' in URL '$url'";
179             }
180              
181 5         348 $self->{host} = $uri->host;
182 5         175 $self->{port} = $uri->port;
183 5         96 $self->{path} = $uri->socket_path;
184 5         47 $self->{password} = $uri->password;
185 5         338 $self->{database} = $uri->database;
186             }
187              
188             sub _is_redisdb_error {
189 37     37   688 ref(shift) =~ /^RedisDB::Error/;
190             }
191              
192             sub _init_parser {
193 83     83   187 my $self = shift;
194             $self->{_parser} = RedisDB::Parser->new(
195             utf8 => $self->{utf8},
196 83         1517 master => $self,
197             error_class => 'RedisDB::Error',
198             );
199             }
200              
201             =head2 $self->execute($command, @arguments)
202              
203             send a command to the server, wait for the result and return it. It will throw
204             an exception if the server returns an error or return L object
205             depending on L parameter. It may be more convenient to use
206             instead of this method wrapper named after the corresponding redis command.
207             E.g.:
208              
209             $redis->execute('set', key => 'value');
210             # is the same as
211             $redis->set(key => 'value');
212              
213             See L section for the full list of defined aliases.
214              
215             Note, that you can not use I if you have sent some commands using
216             I method without the I argument and have not yet got
217             all replies.
218              
219             =cut
220              
221             sub execute {
222 45     45 1 121 my $self = shift;
223 45 50       412 croak "You can't use RedisDB::execute when you have replies to fetch."
224             if $self->replies_to_fetch;
225 45 50       133 croak "This function is not available in subscription mode." if $self->{_subscription_loop};
226 45         148 my $cmd = uc shift;
227 45         376 $self->send_command( $cmd, @_ );
228 42         215 return $self->get_reply;
229             }
230              
231             sub _on_connect_error {
232 14     14   65 my ( $self, $err ) = @_;
233 14   66     131 my $server = $self->{path} || ("$self->{host}:$self->{port}");
234 14         244 my $error_obj =
235             RedisDB::Error::DISCONNECTED->new("Couldn't connect to the redis server at $server: $!");
236 14         381 die $error_obj;
237             }
238              
239             sub _on_disconnect {
240 28     28   106 my ( $self, $err, $error_obj ) = @_;
241              
242 28 100       93 if ($err) {
243 11   66     693 $error_obj ||= RedisDB::Error::DISCONNECTED->new(
244             "Server unexpectedly closed connection. Some data might have been lost.");
245 11 100 66     1063 if ( $self->{raise_error} or $self->{_in_multi} or $self->{_watching} ) {
    50 33        
246 7         64 $self->reset_connection;
247 7         99 die $error_obj;
248             }
249             elsif ( my $loop_type = $self->{_subscription_loop} ) {
250 0         0 my $subscribed = delete $self->{_subscribed};
251 0         0 my $psubscribed = delete $self->{_psubscribed};
252 0         0 my $callback = delete $self->{_subscription_cb};
253 0         0 $self->reset_connection;
254              
255             # there's no simple way to return error from here
256             # TODO: handle it
257 0         0 $self->{raise_error}++;
258 0         0 $self->_connect;
259 0         0 $self->{_subscription_loop} = $loop_type;
260 0         0 $self->{_subscription_cb} = $callback;
261 0         0 $self->{_parser}->set_default_callback($callback);
262 0         0 $self->{_subscribed} = $subscribed;
263 0         0 $self->{_psubscribed} = $psubscribed;
264              
265 0         0 for ( keys %$subscribed ) {
266 0         0 $self->send_command( 'subscribe', $_ );
267             }
268 0         0 for ( keys %$psubscribed ) {
269 0         0 $self->send_command( 'psubscribe', $_ );
270             }
271 0         0 $self->{raise_error}--;
272             }
273             else {
274              
275             # parser may be in inconsistent state, so we just replace it with a new one
276 4         64 my $parser = delete $self->{_parser};
277 4         248 delete $self->{_socket};
278 4         116 $parser->propagate_reply($error_obj);
279             }
280             }
281             else {
282 17 50 0     85 $self->{warnings} and warn( $error_obj || "Server closed connection, reconnecting..." );
283             }
284             }
285              
286             # establish connection to the server.
287             # returns undef on success. On failure returns RedisDB::Error or throws an exception.
288             sub _connect {
289 56     56   203 my $self = shift;
290              
291             # this is to prevent recursion
292             confess "Couldn't connect to the redis-server."
293             . " Connection was immediately closed by the server."
294 56 100       397 if $self->{_in_connect};
295              
296 53         636 $self->{_pid} = $$;
297              
298 53         135 delete $self->{_socket};
299 53         89 my $error;
300 53         156 while ( not $self->{_socket} ) {
301 58 100       228 if ( $self->{path} ) {
302             $self->{_socket} = IO::Socket::UNIX->new(
303             Type => SOCK_STREAM,
304             Peer => $self->{path},
305 2 100       57 ) or $error = $!;
306             }
307             else {
308 56         126 my $attempts = $self->{reconnect_attempts};
309 56         93 my $delay;
310 56   100     396 while ( not $self->{_socket} and $attempts ) {
311 86 100       30005720 sleep $delay if $delay;
312             $self->{_socket} = IO::Socket::IP->new(
313             PeerAddr => $self->{host},
314             PeerPort => $self->{port},
315             Proto => 'tcp',
316 86 100       2906 ( $self->{timeout} ? ( Timeout => $self->{timeout} ) : () ),
    100          
317             ) or $error = $!;
318 86 100       86848 $delay = $delay ? ( 1 + rand ) * $delay : 1;
319 86 100       377 $delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};
320 86         525 $attempts--;
321             }
322             }
323             }
324             continue {
325 58 100       1102 unless ( $self->{_socket} ) {
326 19         53 my $new_error;
327             try {
328 19     19   3096 $self->{on_connect_error}->( $self, $error );
329             }
330             catch {
331 14 100   14   307 if ( $self->{raise_error} ) {
332 6         58 $self->reset_connection;
333 6         85 die $_;
334             }
335             else {
336 8 50       176 $self->{_parser}->propagate_reply($_) if $self->{_parser};
337 8         48 $new_error = $_;
338             }
339 19         754 };
340 13 100       356 return $new_error if $new_error;
341             }
342             }
343              
344 39 100       179 if ( $self->{timeout} ) {
345 2         54 my $tv_sec = int $self->{timeout};
346 2         40 my $tv_usec = ($self->{timeout} * 1e6) % 1e6;
347 2         20 my $timeout;
348              
349             # NetBSD 6 and OpenBSD 5.5 use 64-bit time_t on all architectures
350             my $timet64;
351 2 50       244 if ( $Config{osname} eq 'netbsd' ) {
    50          
352 0         0 $Config{osvers} =~ /^([0-9]+)/;
353 0 0 0     0 if ( $1 and $1 >= 6 ) {
354 0         0 $timet64 = 1;
355             }
356             }
357             elsif ( $Config{osname} eq 'openbsd' ) {
358 0         0 $Config{osvers} =~ /^([0-9]+)\.([0-9]+)/;
359 0 0 0     0 if ( $1 and ( $1 > 5 or ( $1 == 5 and $2 >= 5 ) ) ) {
      0        
360 0         0 $timet64 = 1;
361             }
362             }
363 2 50 33     48 if ( $timet64 and $Config{longsize} == 4 ) {
364 0 0       0 if ( defined $Config{use64bitint} ) {
365 0         0 $timeout = pack( 'QL', $tv_sec, $tv_usec );
366             }
367             else {
368             $timeout = pack(
369             'LLL',
370             (
371 0 0       0 $Config{byteorder} eq '1234'
372             ? ( $tv_sec, 0, $tv_usec )
373             : ( 0, $tv_sec, $tv_usec )
374             )
375             );
376             }
377             }
378             else {
379 2         42 $timeout = pack( 'L!L!', $tv_sec, $tv_usec);
380             }
381             try {
382 2 50   2   500 defined $self->{_socket}->sockopt( SO_RCVTIMEO, $timeout )
383             or die "Can't set timeout: $!";
384 2 50       84 defined $self->{_socket}->sockopt( SO_SNDTIMEO, $timeout )
385             or die "Can't set send timeout: $!";
386             }
387             catch {
388 0     0   0 warn "$_\n";
389 2         146 };
390             }
391              
392 39         341 $self->{_in_connect}++;
393 39         216 $self->_init_parser;
394 39         1339 $self->{_subscription_loop} = 0;
395 39         89 delete $self->{_server_version};
396              
397             # authenticate
398 39 50       126 if ( $self->{password} ) {
399             $self->send_command(
400             "AUTH",
401             $self->{password},
402             sub {
403 0     0   0 my ( $self, $res ) = @_;
404 0 0       0 croak "$res" if _is_redisdb_error($res);
405             }
406 0         0 );
407             }
408              
409             # connection name
410 39 50       156 if ( $self->{connection_name} ) {
411 0         0 $self->send_command( qw(CLIENT SETNAME), $self->{connection_name}, IGNORE_REPLY() );
412             }
413              
414             # select database
415 39 100       126 if ( $self->{database} ) {
416 3         132 $self->send_command( "SELECT", $self->{database}, IGNORE_REPLY() );
417             }
418              
419 36         81 delete $self->{_in_connect};
420 36         76 return;
421             }
422              
423             my $SET_NB = 0;
424             my $DONTWAIT = 0;
425              
426             # Windows don't have MSG_DONTWAIT, so we need to switch socket into non-blocking mode
427             if ( $^O eq 'MSWin32' ) {
428             $SET_NB = 1;
429             }
430             else {
431             $DONTWAIT = MSG_DONTWAIT;
432             }
433              
434             # parse data from the receive buffer without blocking
435             # Returns undef in case of success or RedisDB::Error if failed
436             sub _recv_data_nb {
437 47     47   133 my $self = shift;
438              
439 47 50       214 $self->{_socket}->blocking(0) if $SET_NB;
440              
441 47         85 while (1) {
442 47         837 my $ret = recv( $self->{_socket}, my $buf, 131072, $DONTWAIT );
443 47 50       310 unless ( defined $ret ) {
    100          
444              
445             # socket is connected, no data in recv buffer
446 30 50 33     438 last if $! == EAGAIN or $! == EWOULDBLOCK;
447 0 0       0 next if $! == EINTR;
448              
449             # on any other error close the connection
450 0         0 my $error =
451             RedisDB::Error::DISCONNECTED->new("Error reading from server: $!");
452 0         0 $self->_on_disconnect( 1, $error );
453 0         0 return $error;
454             }
455 0         0 elsif ( $buf ne '' ) {
456              
457             # received some data
458 0         0 $self->{_parser}->parse($buf);
459             }
460             else {
461 17         2146 delete $self->{_socket};
462              
463 17 50 33     495 if ( $self->{_parser}->callbacks or $self->{_in_multi} or $self->{_watching} ) {
      33        
464              
465             # there are some replies lost
466 0         0 $self->_on_disconnect(1);
467             }
468             else {
469             # clean disconnect, try to reconnect
470 17         98 $self->_on_disconnect(0);
471             }
472              
473 17 50       75 unless ( $self->{_socket} ) {
474 17         104 my $error = $self->_connect;
475 14 100       91 return $error if $error;
476             }
477 10         30 last;
478             }
479             }
480              
481 40 50       118 $self->{_socket}->blocking(1) if $SET_NB;
482              
483 40         100 return;
484             }
485              
486             sub _queue {
487 37     37   181 my ( $self, $reply ) = @_;
488 37         91 --$self->{_to_be_fetched};
489 37         87 push @{ $self->{_replies} }, $reply;
  37         235  
490             }
491              
492             =head2 $self->send_command($command[, @arguments][, \&callback])
493              
494             send a command to the server. If send has failed command will die or return
495             L object depending on L parameter. Note, that it
496             does not return reply from the server, if I was not specified, you
497             should retrieve result using I method, otherwise I will
498             be invoked upon receiving the result with two arguments: the RedisDB object,
499             and the reply from the server. If the server returned an error, the second
500             argument to the callback will be a L object, you can get
501             description of the error using this object in string context. If you are not
502             interested in reply, you can use RedisDB::IGNORE_REPLY constant as the last
503             argument.
504              
505             Note, that RedisDB does not run any background threads, so it will not receive
506             the reply and invoke the callback unless you call some of its methods which
507             check if there are replies from the server, like I,
508             I, I, or I.
509              
510             =cut
511              
512             my $NOSIGNAL = try { MSG_NOSIGNAL } || 0;
513              
514             sub send_command {
515 54     54 1 432 my $self = shift;
516              
517 54         103 my $callback;
518 54 100       212 if ( ref $_[-1] eq 'CODE' ) {
519 7         231 $callback = pop;
520             }
521             else {
522 47         94 ++$self->{_to_be_fetched};
523 47         192 $callback = \&_queue;
524             }
525              
526 54         163 my $command = uc shift;
527 54 50       202 if ( $self->{_subscription_loop} ) {
528 0 0       0 croak "only (UN)(P)SUBSCRIBE and QUIT allowed in subscription loop"
529             unless $command =~ /^(P?(UN)?SUBSCRIBE|QUIT)$/;
530             }
531              
532             # remember password
533 54 50 33     361 if ( $command eq 'AUTH' ) {
    100          
    50          
534 0         0 $self->{password} = $_[0];
535             }
536              
537             # if SELECT has been successful, we should update database
538             elsif ( $command eq 'SELECT' ) {
539 3         27 my $cb = $callback;
540 3         33 my $dbnum = $_[0];
541             $callback = sub {
542 0 0   0   0 $_[0]->{database} = $dbnum unless ref $_[1];
543 0         0 $cb->(@_);
544 3         132 };
545             }
546              
547             # if CLIENT SETNAME we should remember the name
548             elsif ( $command eq 'CLIENT' && uc $_[0] eq 'SETNAME' ) {
549 0         0 $self->{connection_name} = $_[1];
550             }
551              
552             # if not yet connected to server, or if process was forked
553             # reestablish connection
554 54 100 66     553 unless ( $self->{_socket} and $self->{_pid} == $$ ) {
555 21         223 my $error = $self->_connect;
556 18 100       117 if ($error) {
557 4         156 $callback->( $self, $error );
558 4         16 return $error;
559             }
560             }
561              
562             # Here we are reading received data and parsing it,
563             # and at the same time checking if the connection is still alive
564 47         232 my $error = $self->_recv_data_nb;
565 44 100       143 if ($error) {
566 4         84 $callback->( $self, $error );
567 4         16 return $error;
568             }
569              
570 40         219 $self->{_parser}->push_callback($callback);
571              
572 40         421 my $request = $self->{_parser}->build_request( $command, @_ );
573             {
574 40 50       103 local $SIG{PIPE} = 'IGNORE' unless $NOSIGNAL;
  40         116  
575 40 50       2608 defined send( $self->{_socket}, $request, $NOSIGNAL )
576             or $self->_on_disconnect( 1,
577             RedisDB::Error::DISCONNECTED->new("Can't send request to server: $!") );
578             }
579              
580 40         339 return 1;
581             }
582              
583             sub _ignore {
584 0     0   0 my ( $self, $res ) = @_;
585 0 0       0 if ( _is_redisdb_error($res) ) {
586 0         0 warn "Ignoring error returned by redis-server: $res";
587             }
588             }
589              
590 3     3 0 78 sub IGNORE_REPLY { return \&_ignore; }
591              
592             =begin comment
593              
594             =head2 $self->send_command_cb($command[, @arguments][, \&callback])
595              
596             send a command to the server, invoke specified I on reply. The
597             callback is invoked with two arguments: the RedisDB object, and reply from the
598             server. If the server returned an error, the second argument will be a
599             L object, you can get description of the error using this
600             object in string context. If the I is not specified, the reply will
601             be discarded. Note, that RedisDB does not run any background threads, so it
602             will not receive the reply and invoke the callback unless you call some of its
603             methods which check if there are replies from the server, like I,
604             I, I, I, or I.
605              
606             B this method is deprecated and may be removed in some future
607             version. Please use I method instead. If you are using
608             I with I<&callback> argument, you can just replace the method
609             with I and it will do the same. If you are using
610             I with the default callback, you should add the
611             RedisDB::IGNORE_REPLY constant as the last argument when replacing the method
612             with I. Here is the example that shows equivalents with
613             I:
614              
615             $redis->send_command_cb("SET", "Key", "Value");
616             # may be replaced with
617             $redis->send_command("SET", "Key", "Value", RedisDB::IGNORE_REPLY);
618              
619             $redis->send_command_cb("GET", "Key", \&process_reply);
620             # may be replaced with
621             $redis->send_command("GET", "Key", \&process_reply);
622              
623             =end comment
624              
625             =cut
626              
627             sub send_command_cb {
628 0     0 1 0 my $self = shift;
629 0 0       0 my $callback = pop if ref $_[-1] eq 'CODE';
630 0   0     0 $callback ||= \&_ignore;
631 0         0 return $self->send_command( @_, $callback );
632             }
633              
634             =head2 $self->reply_ready
635              
636             this method may be used in the pipelining mode to check if there are some
637             replies already received from the server. Returns true if there are replies
638             ready to be fetched with I method.
639              
640             =cut
641              
642             sub reply_ready {
643 0     0 1 0 my $self = shift;
644              
645 0         0 my $error = $self->_recv_data_nb;
646 0 0       0 if ($error) {
647 0         0 $self->_on_disconnect( 1, $error );
648             }
649 0 0       0 return @{ $self->{_replies} } ? 1 : 0;
  0         0  
650             }
651              
652             =head2 $self->mainloop
653              
654             this method blocks till all replies from the server will be received. Note,
655             that callbacks for some replies may send new requests to the server and so this
656             method may block for indefinite time.
657              
658             =cut
659              
660             sub mainloop {
661 0     0 1 0 my $self = shift;
662              
663 0 0       0 return unless $self->{_parser};
664              
665 0         0 while ( $self->{_parser}->callbacks ) {
666 0 0       0 croak "You can't call mainloop in the child process" unless $self->{_pid} == $$;
667 0         0 my $ret = recv( $self->{_socket}, my $buffer, 131073, 0 );
668 0 0       0 unless ( defined $ret ) {
669 0 0       0 next if $! == EINTR;
670 0 0       0 if ( $! == EAGAIN ) {
671 0         0 confess "Timed out waiting reply from the server";
672             }
673             else {
674 0         0 $self->_on_disconnect( 1,
675             RedisDB::Error::DISCONNECTED->new("Error reading reply from the server: $!") );
676 0         0 next;
677             }
678             }
679 0 0       0 if ( $buffer ne '' ) {
680              
681             # received some data
682 0         0 $self->{_parser}->parse($buffer);
683             }
684             else {
685              
686             # disconnected
687 0         0 $self->_on_disconnect(
688             1,
689             RedisDB::Error::DISCONNECTED->new(
690             "Server unexpectedly closed connection before sending full reply")
691             );
692             }
693             }
694 0         0 return;
695             }
696              
697             =head2 $self->get_reply
698              
699             receive and return reply from the server. If the server returned an error,
700             method throws L exception or returns L object,
701             depending on the L parameter.
702              
703             =cut
704              
705             sub get_reply {
706 44     44 1 2980 my $self = shift;
707              
708             croak "We are not waiting for reply"
709 44         408 unless @{ $self->{_replies} }
710             or $self->{_to_be_fetched}
711 44 0 66     67 or $self->{_subscription_loop};
      33        
712 44 50       218 croak "You can't read reply in child process" unless $self->{_pid} == $$;
713 44         198 while ( not @{ $self->{_replies} } ) {
  82         290  
714 45         6000396 my $ret = recv( $self->{_socket}, my $buffer, 131074, 0 );
715 45 100       383 if ( not defined $ret ) {
    100          
716 2 50 33     170 next if $! == EINTR or $! == 0;
717 2         6 my $err;
718 2 50 33     68 if ( $! == EAGAIN or $! == EWOULDBLOCK ) {
719 2         192 $err = RedisDB::Error::EAGAIN->new("$!");
720             }
721             else {
722 0         0 $err = RedisDB::Error::DISCONNECTED->new("Connection error: $!");
723             }
724 2         156 $self->_on_disconnect( 1, $err );
725             }
726             elsif ( $buffer ne '' ) {
727              
728             # received some data
729 34         349 $self->{_parser}->parse($buffer);
730             }
731             else {
732              
733             # disconnected, should die unless raise_error is unset
734 9         46 $self->_on_disconnect(1);
735             }
736             }
737              
738 37         80 my $res = shift @{ $self->{_replies} };
  37         101  
739 37 50 33     132 if ( _is_redisdb_error($res)
      66        
740             and ( $self->{raise_error} or $self->{_in_multi} or $self->{_watching} ) )
741             {
742 0         0 croak $res;
743             }
744              
745 37 50       141 if ( $self->{_subscription_loop} ) {
746 0 0       0 confess "Expected multi-bulk reply, but got $res" unless ref $res;
747 0 0       0 if ( $res->[0] eq 'message' ) {
    0          
    0          
748             $self->{_subscribed}{ $res->[1] }( $self, $res->[1], undef, $res->[2] )
749 0 0       0 if $self->{_subscribed}{ $res->[1] };
750             }
751             elsif ( $res->[0] eq 'pmessage' ) {
752             $self->{_psubscribed}{ $res->[1] }( $self, $res->[2], $res->[1], $res->[3] )
753 0 0       0 if $self->{_psubscribed}{ $res->[1] };
754             }
755             elsif ( $res->[0] =~ /^p?(un)?subscribe/ ) {
756              
757             # ignore
758             }
759             else {
760 0         0 confess "Got unknown reply $res->[0] in subscription mode";
761             }
762             }
763              
764 37         697 return $res;
765             }
766              
767             =head2 $self->get_all_replies
768              
769             wait till replies to all the commands without callback set will be received.
770             Returns a list of replies to these commands. For commands with callback set
771             replies are processed as usual. Unlike I this method blocks only till
772             replies to all commands for which callback was NOT set will be received.
773              
774             =cut
775              
776             sub get_all_replies {
777 0     0 1 0 my $self = shift;
778 0         0 my @res;
779 0         0 while ( $self->replies_to_fetch ) {
780 0         0 push @res, $self->get_reply;
781             }
782 0         0 return @res;
783             }
784              
785             =head2 $self->replies_to_fetch
786              
787             return the number of commands sent to the server replies to which were not yet
788             retrieved with I or I. This number only includes
789             commands for which callback was not set.
790              
791             =cut
792              
793             sub replies_to_fetch {
794 45     45 1 98 my $self = shift;
795 45         127 return $self->{_to_be_fetched} + @{ $self->{_replies} };
  45         246  
796             }
797              
798             =head2 $self->selected_database
799              
800             get currently selected database.
801              
802             =cut
803              
804             sub selected_database {
805 0     0 1 0 shift->{database};
806             }
807              
808             =head2 $self->reset_connection
809              
810             reset connection. This method closes existing connection and drops all
811             previously sent requests. After invoking this method the object returns to the
812             same state as it was returned by the constructor.
813              
814             =cut
815              
816             sub reset_connection {
817 13     13 1 43 my $self = shift;
818 13         1238 delete $self->{$_} for grep /^_/, keys %$self;
819 13         82 $self->{_replies} = [];
820 13         74 $self->_init_parser;
821 13         368 $self->{_to_be_fetched} = 0;
822 13         65 return;
823             }
824              
825             =head2 $self->version
826              
827             return the version of the server the client is connected to. The version is
828             returned as a floating point number represented the same way as the perl
829             versions. E.g. for redis 2.1.12 it will return 2.001012.
830              
831             =cut
832              
833             sub version {
834 0     0 1 0 my $self = shift;
835 0         0 my $info = $self->info;
836 0 0       0 $info->{redis_version} =~ /^([0-9]+)[.]([0-9]+)(?:[.]([0-9]+))?/
837             or croak "Can't parse version string: $info->{redis_version}";
838 0 0       0 $self->{_server_version} = $1 + 0.001 * $2 + ( $3 ? 0.000001 * $3 : 0 );
839 0         0 return $self->{_server_version};
840             }
841              
842             # don't forget to update POD
843             my @commands = qw(
844             append asking auth bgrewriteaof bgsave bitcount bitop bitpos
845             blpop brpop brpoplpush client client_kill client_getname client_setname
846             cluster command
847             config config_get config_set config_resetstat config_rewrite
848             dbsize debug_error debug_object debug_segfault
849             decr decrby del dump echo eval evalsha exists expire expireat flushall
850             flushdb geoadd geodist geohash geopos georadius georadiusbymember
851             get getbit getrange getset hdel hexists hget hgetall
852             hincrby hincrbyfloat hkeys hlen hmget hscan hmset hset hsetnx hvals incr incrby
853             incrbyfloat keys lastsave lindex linsert llen lpop lpush lpushx
854             lrange lrem lset ltrim mget migrate move mset msetnx object object_refcount
855             object_encoding object_idletime persist pexpire pexpireat pfadd pfcount pfmerge ping psetex pttl
856             pubsub pubsub_channels pubsub_numsub pubsub_numpat
857             publish quit randomkey rename renamenx restore rpop rpoplpush
858             rpush rpushx sadd save scan scard script script_exists script_flush script_kill
859             script_load sdiff sdiffstore select set
860             setbit setex setnx setrange sinter sinterstore
861             sismember slaveof slowlog smembers smove sort spop srandmember
862             srem sscan strlen sunion sunionstore time ttl type
863             zadd zcard zcount zincrby zinterstore zlexcount zrange zrangebylex
864             zrangebyscore zrank zrem zremrangebylex
865             zremrangebyrank zremrangebyscore zrevrange zrevrangebyscore zrevrank
866             zscan zscore zunionstore
867             );
868              
869             sub _simple_commands {
870 0     0   0 return @commands;
871             }
872              
873             =head1 WRAPPER METHODS
874              
875             Instead of using I and I methods directly, it may be
876             more convenient to use wrapper methods with names matching names of the redis
877             commands. These methods call I or I depending on the
878             presence of the callback argument. If callback is specified, the method invokes
879             I and returns as soon as the command has been sent to the server;
880             when the reply is received, it will be passed to the callback (see
881             L). If there is no callback, the method invokes
882             I, waits for the reply from the server, and returns it. E.g.:
883              
884             $val = $redis->get($key);
885             # equivalent to
886             $val = $redis->execute("get", $key);
887              
888             $redis->get($key, sub { $val = $_[1] });
889             # equivalent to
890             $redis->send_command("get", $key, sub { $val = $_[1] });
891              
892             The following wrapper methods are defined: append, asking, auth, bgrewriteaof, bgsave,
893             bitcount, bitop, bitpos, blpop, brpop, brpoplpush, client, client_kill,
894             client_getname, client_setname, cluster, command, config, config_get, config_set,
895             config_resetstat, config_rewrite, dbsize, debug_error, debug_object, debug_segfault, decr,
896             decrby, del, dump, echo, eval, evalsha, exists, expire, expireat, flushall,
897             flushdb, geoadd, geodist, geohash, geopos, georadius, georadiusbymember,
898             get, getbit, getrange, getset, hdel, hexists, hget, hgetall, hincrby,
899             hincrbyfloat, hkeys, hlen, hmget, hscan, hmset, hset, hsetnx, hvals, incr,
900             incrby, incrbyfloat, keys, lastsave, lindex, linsert, llen, lpop, lpush,
901             lpushx, lrange, lrem, lset, ltrim, mget, migrate, move, mset, msetnx, object,
902             object_refcount, object_encoding, object_idletime, persist, pexpire, pexpireat,
903             pfadd, pfcount, pfmerge, ping, psetex, pttl, publish, pubsub, pubsub_channels, pubsub_numsub,
904             pubsub_numpat, quit, randomkey, rename, renamenx, restore, rpop, rpoplpush,
905             rpush, rpushx, sadd, save, scan, scard, script, script_exists, script_flush,
906             script_kill, script_load, sdiff, sdiffstore, select, set, setbit, setex, setnx,
907             setrange, sinter, sinterstore, sismember, slaveof, slowlog, smembers, smove,
908             sort, spop, srandmember, srem, sscan strlen, sunion, sunionstore, time,
909             ttl, type, unwatch, watch, zadd, zcard, zcount, zincrby, zinterstore,
910             zlexcount, zrange, zrangebylex, zrangebyscore, zrank, zrem, zremrangebylex,
911             zremrangebyrank, zremrangebyscore, zrevrange, zrevrangebyscore, zrevrank,
912             zscan, zscore, zunionstore.
913              
914             See description of all commands in redis documentation at
915             L.
916              
917             =cut
918              
919             for my $command (@commands) {
920             my @uccom = split /_/, uc $command;
921 17     17   161 no strict 'refs';
  17         48  
  17         24528  
922             *{ __PACKAGE__ . "::$command" } = sub {
923 49     49   1044081 my $self = shift;
924 49 100       275 if ( ref $_[-1] eq 'CODE' ) {
925 4         204 return $self->send_command( @uccom, @_ );
926             }
927             else {
928 45         270 return $self->execute( @uccom, @_ );
929             }
930             };
931             }
932              
933             =pod
934              
935             The following commands implement some additional postprocessing of the results:
936              
937             =cut
938              
939             sub _execute_with_postprocess {
940 0     0     my $self = shift;
941 0           my $ppsub = pop;
942 0 0 0       if ( $_[-1] && ref $_[-1] eq 'CODE' ) {
943 0           my $orig = pop;
944             my $cb = sub {
945 0     0     my ( $redis, $reply ) = @_;
946 0 0         $reply = $ppsub->($reply) unless _is_redisdb_error($reply);
947 0           $orig->( $redis, $reply );
948 0           };
949 0           return $self->send_command( @_, $cb );
950             }
951             else {
952 0           my $reply = $self->execute(@_);
953 0 0         $reply = $ppsub->($reply) unless _is_redisdb_error($reply);
954 0           return $reply;
955             }
956             }
957              
958             =head2 $self->info([\&callback])
959              
960             return information and statistics about the server. Redis-server returns
961             information in form of I, the I method parses result and
962             returns it as a hash reference.
963              
964             =cut
965              
966             sub info {
967 0     0 1   my $self = shift;
968 0           return $self->_execute_with_postprocess('INFO', @_, \&_parse_info);
969             }
970              
971             sub _parse_info {
972 0     0     my $info = shift;
973 0 0 0       return $info if !$info || ref $info;
974 0           my %info = map { /^([^:]+):(.*)$/ } split /\r\n/, $info;
  0            
975 0           return \%info;
976             }
977              
978             =head2 $self->client_list([\&callback])
979              
980             return list of clients connected to the server. This method parses server
981             output and returns result as reference to array of hashes.
982              
983             =cut
984              
985             sub client_list {
986 0     0 1   my $self = shift;
987 0           return $self->_execute_with_postprocess('CLIENT', 'LIST', @_, \&_parse_client_list);
988             }
989              
990             sub _parse_client_list {
991 0     0     my $list = shift;
992 0 0 0       return $list if !$list || ref $list;
993 0           my @clients = split /\015?\012/, $list;
994 0           my $res = [];
995 0           for (@clients) {
996 0 0         my %cli = map { /^([^=]+)=(.*)$/ ? ( $1, $2 ) : () } split / /;
  0            
997 0           push @$res, \%cli;
998             }
999 0           return $res;
1000             }
1001              
1002             =head2 $self->cluster_info([\&callback])
1003              
1004             return information and statistics about the cluster. Redis-server returns
1005             information in form of I, the I method parses result
1006             and returns it as a hash reference.
1007              
1008             =cut
1009              
1010             sub cluster_info {
1011 0     0 1   my $self = shift;
1012 0           return $self->_execute_with_postprocess('CLUSTER', 'INFO', @_, \&_parse_info);
1013             }
1014              
1015             =head2 $self->cluster_nodes([\&callback])
1016              
1017             return list of cluster nodes. Each node represented as a hash with the
1018             following keys: node_id, address, host, port, flags, master_id, last_ping_sent,
1019             last_pong_received, link_state, slots.
1020              
1021             =cut
1022              
1023             sub cluster_nodes {
1024 0     0 1   my $self = shift;
1025             return $self->_execute_with_postprocess( 'CLUSTER', 'NODES', @_,
1026 0     0     sub { $self->_parse_cluster_nodes(@_) } );
  0            
1027             }
1028              
1029             sub _parse_cluster_nodes {
1030 0     0     my ($self, $list) = @_;
1031              
1032 0           my @nodes;
1033 0           for ( split /^/, $list ) {
1034 0           my ( $node_id, $addr, $flags, $master_id, $ping, $pong, $state, @slots ) =
1035             split / /;
1036 0           my %flags = map { $_ => 1 } split /,/, $flags;
  0            
1037 0           my ( $host_port ) = split /@/, $addr;
1038 0           my ( $host, $port ) = split /:([^:]+)$/, $host_port;
1039 0 0         unless ($host) {
1040 0           $host = $self->{host}, $addr = "$self->{host}:$port",
1041             }
1042 0           my $node = {
1043             node_id => $node_id,
1044             address => $addr,
1045             host => $host,
1046             port => $port,
1047             flags => \%flags,
1048             master_id => $master_id,
1049             last_ping_sent => $ping,
1050             last_pong_received => $pong,
1051             link_state => $state,
1052             slots => \@slots,
1053             };
1054 0           push @nodes, $node;
1055             }
1056              
1057 0           return \@nodes;
1058             }
1059              
1060             sub _parse_role {
1061 0     0     my $role = shift;
1062              
1063 0           my $parsed = {
1064             role => $role->[0],
1065             };
1066 0 0         if ( $parsed->{role} eq 'master' ) {
    0          
    0          
1067 0           $parsed->{replication_offset} = $role->[1];
1068 0           for ( @{ $role->[2] } ) {
  0            
1069 0           push @{ $parsed->{slaves} },
  0            
1070             {
1071             host => $_->[0],
1072             port => $_->[1],
1073             replication_offset => $_->[2],
1074             };
1075             }
1076             }
1077             elsif ( $parsed->{role} eq 'slave' ) {
1078             $parsed->{master} = {
1079 0           host => $role->[1],
1080             port => $role->[2],
1081             };
1082 0           $parsed->{status} = $role->[3];
1083 0           $parsed->{replication_offset} = $role->[4];
1084             }
1085             elsif ( $parsed->{role} eq 'sentinel' ) {
1086 0           for ( @{ $role->[1] } ) {
  0            
1087 0           push @{ $parsed->{services} }, $_;
  0            
1088             }
1089             } else {
1090 0           confess "Unknown role $parsed->{role}";
1091             }
1092              
1093 0           return $parsed;
1094             }
1095              
1096             =head2 $self->role([\&callback])
1097              
1098             return reference to a hash describing the role of the server. Hash contains
1099             "role" element that can be either "master", "slave", or "sentinel". For master
1100             hash will also contain "replication_offset" and "slaves" elements, for slave it
1101             will contain "master", "status", and "replication_offset" elements, and for
1102             sentinel it will contain "services".
1103              
1104             =cut
1105              
1106             sub role {
1107 0     0 1   my $self = shift;
1108 0           return $self->_execute_with_postprocess( 'ROLE', @_, \&_parse_role );
1109             }
1110              
1111             =head2 $self->shutdown
1112              
1113             Shuts the redis server down. Returns undef, as the server doesn't send the
1114             answer. Croaks in case of the error.
1115              
1116             =cut
1117              
1118             sub shutdown {
1119 0     0 1   my $self = shift;
1120 0           $self->send_command_cb( 'SHUTDOWN', @_ );
1121 0           return;
1122             }
1123              
1124             =head2 $self->scan_all([MATCH => $pattern,][COUNT => $count,])
1125              
1126             this method starts a new SCAN iteration and executes SCAN commands till cursor
1127             returned by server is 0. It then returns all the keys returned by server during
1128             the iteration. MATCH and COUNT are passed to SCAN command. In case of success
1129             returns reference to array with matching keys, in case of error dies or returns
1130             L object depending on I option.
1131              
1132             =cut
1133              
1134             sub scan_all {
1135 0     0 1   my $self = shift;
1136 0 0         if ( ref $_[-1] eq 'CODE' ) {
1137 0           croak "scan_all does not accept callback parameter";
1138             }
1139 0           my $cursor = 0;
1140 0           my @result;
1141 0           do {
1142 0           my $res = $self->execute( 'SCAN', $cursor, @_ );
1143              
1144             # in case of error just return it
1145 0 0         return $res unless ref $res eq 'ARRAY';
1146 0           $cursor = $res->[0];
1147 0           push @result, @{ $res->[1] };
  0            
1148             } while $cursor;
1149 0           return \@result;
1150             }
1151              
1152             =head2 $self->hscan_all($key, [MATCH => $pattern,][COUNT => $count,])
1153              
1154             =head2 $self->sscan_all($key, [MATCH => $pattern,][COUNT => $count,])
1155              
1156             =head2 $self->zscan_all($key, [MATCH => $pattern,][COUNT => $count,])
1157              
1158             these three methods are doing the same thing as I except that they
1159             require a key as the first parameter, and they iterate using HSCAN, SSCAN and
1160             ZSCAN commands.
1161              
1162             =cut
1163              
1164             for my $command (qw(hscan sscan zscan)) {
1165             my $uccom = uc $command;
1166 17     17   159 no strict 'refs';
  17         42  
  17         28052  
1167             my $name = "${command}_all";
1168             *{ __PACKAGE__ . "::$name" } = sub {
1169 0     0     my $self = shift;
1170 0           my $key = shift;
1171 0 0         if ( ref $_[-1] eq 'CODE' ) {
1172 0           croak "$name does not accept callback parameter";
1173             }
1174 0           my $cursor = 0;
1175 0           my @result;
1176 0           do {
1177 0           my $res = $self->execute( $uccom, $key, $cursor, @_ );
1178 0 0         return $res unless ref $res eq 'ARRAY';
1179 0           $cursor = $res->[0];
1180 0           push @result, @{ $res->[1] };
  0            
1181             } while $cursor;
1182 0           return \@result;
1183             };
1184             }
1185              
1186             =head1 UTF-8 SUPPORT
1187              
1188             The redis protocol is designed to work with the binary data, both keys and
1189             values are encoded in the same way as sequences of octets. By default this
1190             module expects all data to be just strings of bytes. There is an option to
1191             treat all data as UTF-8 strings. If you pass I parameter to the
1192             constructor, module will encode all strings to UTF-8 before sending them to
1193             server, and will decode all strings received from server from UTF-8. This has
1194             following repercussions you should be aware off: first, you can't store binary
1195             data on server with this option on, it would be treated as a sequence of latin1
1196             characters, and would be converted into a corresponding sequence of UTF-8
1197             encoded characters; second, if data returned by the server is not a valid UTF-8
1198             encoded string, the module will croak, and you will have to reinitialize the
1199             connection. The parser only checks for invalid UTF-8 byte sequences, it doesn't
1200             check if input contains invalid code points. Generally, using this option is
1201             not recommended.
1202              
1203             =cut
1204              
1205             =head1 ERROR HANDLING
1206              
1207             If L parameter was set to true in the constructor (which is
1208             default setting), then module will throw an exception in case network IO
1209             function returned an error, or if redis-server returned an error reply. Network
1210             exceptions belong to L or
1211             L class, if redis-server returned an error
1212             exception will be of L class. If the object was in subscription
1213             mode, you will have to restore all the subscriptions. If the object was in the
1214             middle of transaction, when after network error you will have to start the
1215             transaction again.
1216              
1217             If L parameter was disabled, then instead of throwing an
1218             exception, module will return exception object and also pass this exception
1219             object to every callback waiting for the reply from the server. If the object
1220             is in subscription mode, then module will automatically restore all
1221             subscriptions after reconnect. Note, that during transaction L is
1222             always enabled, so any error will throw an exception.
1223              
1224             =cut
1225              
1226             =head1 HANDLING OF SERVER DISCONNECTS
1227              
1228             Redis server may close a connection if it was idle for some time, also the
1229             connection may be closed in case when redis-server was restarted, or just
1230             because of the network problem. RedisDB always tries to restore connection to
1231             the server if no data has been lost as a result of disconnect, and if
1232             L parameter disabled it will try to reconnect even if disconnect
1233             happened during data transmission. E.g. if the client was idle for some time
1234             and the redis server closed the connection, it will be transparently restored
1235             when you send a command next time no matter if L enabled or not.
1236             If you sent a command and the server has closed the connection without sending
1237             a complete reply, then module will act differently depending on L
1238             value. If L enabled, the module will cancel all current
1239             callbacks, reset the object to the initial state, and throw an exception of
1240             L class, next time you use the object it will
1241             establish a new connection. If L disabled, the module will pass
1242             L object to all outstanding callbacks and will
1243             try to reconnect to the server; it will also automatically restore
1244             subscriptions if object was in subscription mode. Module never tries to
1245             reconnect after MULTI or WATCH command was sent to server and before
1246             corresponding UNWATCH, EXEC or DISCARD was sent as this may cause data
1247             corruption, so during transaction module behaves like if L is
1248             set.
1249              
1250             Module makes several attempts to reconnect each time increasing interval before
1251             the next attempt, depending on the values of L and
1252             L. After each failed attempt to connect module will
1253             invoke L callback which for example may change redis-server
1254             hostname, so on next attempt module will try to connect to different server.
1255              
1256             =cut
1257              
1258             =head1 PIPELINING
1259              
1260             You can send commands in the pipelining mode. It means you are sending multiple
1261             commands to the server without waiting for the replies. This is implemented by
1262             the I method. Recommended way of using it is to pass a reference
1263             to the callback function as the last argument. When module receives reply from
1264             the server, it will call this function with two arguments: reference to the
1265             RedisDB object, and reply from the server. It is important to understand
1266             though, that RedisDB does not run any background threads, neither it checks for
1267             the replies by setting some timer, so e.g. in the following example callback
1268             will never be invoked:
1269              
1270             my $pong;
1271             $redis->send_command( "ping", sub { $pong = $_[1] } );
1272             sleep 1 while not $pong; # this will never return
1273              
1274             Therefore you need periodically trigger check for the replies. The check is
1275             triggered when you call the following methods: I, I,
1276             I, I. Calling wrapper method, like
1277             C<< $redis->get('key') >>, will also trigger check as internally wrapper methods
1278             use methods listed above.
1279              
1280             Also you can omit callback argument when invoke I. In this case
1281             you have to fetch reply later explicitly using I method. This is how
1282             synchronous I is implemented, basically it is:
1283              
1284             sub execute {
1285             my $self = shift;
1286             $self->send_command(@_);
1287             return $self->get_reply;
1288             }
1289              
1290             That is why it is not allowed to call I unless you have got replies to
1291             all commands sent previously with I without callback. Using
1292             I without callback is not recommended.
1293              
1294             Sometimes you are not interested in replies sent by the server, e.g. SET
1295             command usually just return 'OK', in this case you can pass to I
1296             callback which ignores its arguments, or use C constant, it
1297             is a no-op function:
1298              
1299             for (@keys) {
1300             # execute will not just send 'GET' command to the server,
1301             # but it will also receive response to the 'SET' command sent on
1302             # the previous loop iteration
1303             my $val = $redis->execute( "get", $_ );
1304             $redis->send_command( "set", $_, fun($val), RedisDB::IGNORE_REPLY );
1305             }
1306             # and this will wait for the last reply
1307             $redis->mainloop;
1308              
1309             or using L you can rewrite it as:
1310              
1311             for (@keys) {
1312             my $val = $redis->get($_);
1313             $redis->set( $_, fun($val), RedisDB::IGNORE_REPLY );
1314             }
1315             $redis->mainloop;
1316              
1317             =cut
1318              
1319             =head1 PUB/SUB MESSAGING
1320              
1321             RedisDB supports subscriptions to redis channels. In the subscription mode you
1322             can subscribe to some channels and receive all the messages sent to these
1323             channels. You can subscribe to channels and then manually check messages using
1324             I method, or you can invoke I method, which will
1325             block in loop waiting for messages and invoking callback for each received
1326             message. In the first case you can use I and I methods
1327             to subscribe to channels and then you can use I method to get
1328             messages from the channel:
1329              
1330             $redis->subscribe(
1331             foo => sub {
1332             my ( $redis, $channel, $patern, $message ) = @_;
1333             print "Foo: $message\n";
1334             }
1335             );
1336             # Wait for messages
1337             $res = $redis->get_reply;
1338              
1339             I method for messages from the channel will invoke callback
1340             specified as the second optional argument of the I method and will
1341             also return raw replies from the server, both for messages from the channels
1342             and for informational messages from the redis server. If you do not want to
1343             block in I method, you can check if there are any messages using
1344             I method.
1345              
1346             In the second case you invoke I method, it subscribes to
1347             specified channels and waits for messages, when a message arrived it invokes
1348             callback defined for the channel from which the message came. Here is an
1349             example:
1350              
1351             my $message_cb = sub {
1352             my ( $redis, $channel, $pattern, $message ) = @_;
1353             print "$channel: $message\n";
1354             };
1355              
1356             my $control_cb = sub {
1357             my ( $redis, $channel, $pattern, $message ) = @_;
1358             if ( $channel eq 'control.quit' ) {
1359             $redis->unsubscribe;
1360             $redis->punsubscribe;
1361             }
1362             elsif ( $channel eq 'control.subscribe' ) {
1363             $redis->subscribe($message);
1364             }
1365             };
1366              
1367             $redis->subscription_loop(
1368             subscribe => [ 'news', ],
1369             psubscribe => [ 'control.*' => $control_cb ],
1370             default_callback => $message_cb,
1371             );
1372              
1373             subscription_loop will subscribe you to the "news" channel and "control.*"
1374             channels. It will call specified callbacks every time a new message received.
1375             When message came from "control.subscribe" channel, callback subscribes to an
1376             additional channel. When message came from "control.quit" channel, callback
1377             unsubscribes from all channels.
1378              
1379             Callbacks used in subscription mode receive four arguments: the RedisDB object,
1380             the channel from which the message came, the pattern if you subscribed to this
1381             channel using I method, and the message itself.
1382              
1383             Once you switched into subscription mode using either I or
1384             I command, or by entering I, you only can send
1385             I, I, I, and I commands to
1386             the server, other commands will throw an exception.
1387              
1388             You can publish messages into the channels using the I method. This
1389             method should be called when you in the normal mode, and can't be used while
1390             you're in the subscription mode.
1391              
1392             Following methods can be used in subscription mode:
1393              
1394             =cut
1395              
1396             =head2 $self->subscription_loop(%parameters)
1397              
1398             Enter into the subscription mode. The method subscribes you to the specified
1399             channels, waits for the messages, and invokes the appropriate callback for
1400             every received message. The method returns after you unsubscribed from all the
1401             channels. It accepts the following parameters:
1402              
1403             =over 4
1404              
1405             =item default_callback
1406              
1407             reference to the default callback. This callback is invoked for a message if you
1408             didn't specify other callback for the channel this message comes from.
1409              
1410             =item subscribe
1411              
1412             an array reference. Contains the list of channels you want to subscribe. A
1413             channel name may be optionally followed by the reference to a callback function
1414             for this channel. E.g.:
1415              
1416             [ 'news', 'messages', 'errors' => \&error_cb, 'other' ]
1417              
1418             channels "news", "messages", and "other" will use default callback, but for
1419             the "errors" channel error_cb function will be used.
1420              
1421             =item psubscribe
1422              
1423             same as subscribe, but you specify patterns for channels' names.
1424              
1425             =back
1426              
1427             All parameters are optional, but you must subscribe at least to one channel. Also
1428             if default_callback is not specified, you have to explicitly specify a callback
1429             for every channel you are going to subscribe.
1430              
1431             =cut
1432              
1433             sub subscription_loop {
1434 0     0 1   my ( $self, %args ) = @_;
1435 0 0         croak "Already in subscription loop" if $self->{_subscription_loop} > 0;
1436 0 0         croak "You can't start subscription loop while in pipelining mode."
1437             if $self->replies_to_fetch;
1438 0   0       $self->{_subscribed} ||= {};
1439 0   0       $self->{_psubscribed} ||= {};
1440 0           $self->{_subscription_cb} = $args{default_callback};
1441 0           $self->{_subscription_loop} = 1;
1442 0           $self->{_parser}->set_default_callback( \&_queue );
1443              
1444 0 0         if ( $args{subscribe} ) {
1445 0           while ( my $channel = shift @{ $args{subscribe} } ) {
  0            
1446 0           my $cb;
1447 0 0         $cb = shift @{ $args{subscribe} } if ref $args{subscribe}[0] eq 'CODE';
  0            
1448 0           $self->subscribe( $channel, $cb );
1449             }
1450             }
1451 0 0         if ( $args{psubscribe} ) {
1452 0           while ( my $channel = shift @{ $args{psubscribe} } ) {
  0            
1453 0           my $cb;
1454 0 0         $cb = shift @{ $args{psubscribe} } if ref $args{psubscribe}[0] eq 'CODE';
  0            
1455 0           $self->psubscribe( $channel, $cb );
1456             }
1457             }
1458             croak "You must subscribe at least to one channel"
1459 0 0 0       unless ( keys %{ $self->{_subscribed} } or keys %{ $self->{_psubscribed} } );
  0            
  0            
1460              
1461 0           while ( $self->{_subscription_loop} ) {
1462 0           $self->get_reply;
1463             }
1464 0           return;
1465             }
1466              
1467             =head2 $self->subscribe($channel[, \&callback])
1468              
1469             Subscribe to the I<$channel>. If I<$callback> is not specified, default
1470             callback will be used in subscription loop, or messages will be returned by
1471             I if you are not using subscription loop.
1472              
1473             =cut
1474              
1475             sub subscribe {
1476 0     0 1   my ( $self, $channel, $callback ) = @_;
1477 0 0         unless ( $self->{_subscription_loop} ) {
1478 0           $self->{_subscription_loop} = -1;
1479 0           $self->{_subscription_cb} = \&_queue;
1480 0           $self->{_parser}->set_default_callback( \&_queue );
1481             }
1482 0 0         croak "Subscribe to what channel?" unless length $channel;
1483 0 0         if ( $self->{_subscription_loop} > 0 ) {
1484             $callback ||= $self->{_subscription_cb}
1485 0 0 0       or croak "Callback for $channel not specified, neither default callback defined";
1486             }
1487             else {
1488 0   0 0     $callback ||= sub { 1 };
  0            
1489             }
1490 0           $self->{_subscribed}{$channel} = $callback;
1491 0           $self->send_command( "SUBSCRIBE", $channel, \&_queue );
1492 0           return;
1493             }
1494              
1495             =head2 $self->psubscribe($pattern[, \&callback])
1496              
1497             Subscribe to channels matching I<$pattern>. If I<$callback> is not specified,
1498             default callback will be used in subscription loop, or messages will be
1499             returned by I if you are not using subscription loop.
1500              
1501             =cut
1502              
1503             sub psubscribe {
1504 0     0 1   my ( $self, $channel, $callback ) = @_;
1505 0 0         unless ( $self->{_subscription_loop} ) {
1506 0           $self->{_subscription_loop} = -1;
1507 0           $self->{_subscription_cb} = \&_queue;
1508 0           $self->{_parser}->set_default_callback( \&_queue );
1509             }
1510 0 0         croak "Subscribe to what channel?" unless length $channel;
1511 0 0         if ( $self->{_subscription_loop} > 0 ) {
1512             $callback ||= $self->{_subscription_cb}
1513 0 0 0       or croak "Callback for $channel not specified, neither default callback defined";
1514             }
1515             else {
1516 0   0 0     $callback ||= sub { 1 };
  0            
1517             }
1518 0           $self->{_psubscribed}{$channel} = $callback;
1519 0           $self->send_command( "PSUBSCRIBE", $channel, \&_queue );
1520 0           return;
1521             }
1522              
1523             =head2 $self->unsubscribe([@channels])
1524              
1525             Unsubscribe from the listed I<@channels>. If no channels was specified,
1526             unsubscribe from all the channels to which you have subscribed using
1527             I.
1528              
1529             =cut
1530              
1531             sub unsubscribe {
1532 0     0 1   my $self = shift;
1533 0 0         if (@_) {
1534 0           delete $self->{_subscribed}{$_} for @_;
1535             }
1536             else {
1537 0           $self->{_subscribed} = {};
1538             }
1539 0 0 0       if ( %{ $self->{_subscribed} }
  0            
1540 0 0         or %{ $self->{_psubscribed} || {} } )
1541             {
1542 0           return $self->send_command( "UNSUBSCRIBE", @_ );
1543             }
1544             else {
1545 0           delete $self->{_subscription_loop};
1546 0           $self->{_to_be_fetched} = 0;
1547 0           return $self->_connect;
1548             }
1549             }
1550              
1551             =head2 $self->punsubscribe([@patterns])
1552              
1553             Unsubscribe from the listed I<@patterns>. If no patterns was specified,
1554             unsubscribe from all the channels to which you have subscribed using
1555             I.
1556              
1557             =cut
1558              
1559             sub punsubscribe {
1560 0     0 1   my $self = shift;
1561 0 0         if (@_) {
1562 0           delete $self->{_psubscribed}{$_} for @_;
1563             }
1564             else {
1565 0           $self->{_psubscribed} = {};
1566             }
1567 0 0 0       if ( %{ $self->{_subscribed} || {} }
  0 0          
1568 0           or %{ $self->{_psubscribed} } )
1569             {
1570 0           return $self->send_command( "PUNSUBSCRIBE", @_ );
1571             }
1572             else {
1573 0           delete $self->{_subscription_loop};
1574 0           $self->{_to_be_fetched} = 0;
1575 0           return $self->_connect;
1576             }
1577             }
1578              
1579             =head2 $self->subscribed
1580              
1581             Return list of channels to which you have subscribed using I
1582              
1583             =cut
1584              
1585             sub subscribed {
1586 0     0 1   return keys %{ shift->{_subscribed} };
  0            
1587             }
1588              
1589             =head2 $self->psubscribed
1590              
1591             Return list of channels to which you have subscribed using I
1592              
1593             =cut
1594              
1595             sub psubscribed {
1596 0     0 1   return keys %{ shift->{_psubscribed} };
  0            
1597             }
1598              
1599             =head1 TRANSACTIONS
1600              
1601             Transactions allow you to execute a sequence of commands in a single step. In
1602             order to start a transaction you should use the I method. After you
1603             have entered a transaction all the commands you issue are queued, but not
1604             executed till you call the I method. Typically these commands return
1605             string "QUEUED" as a result, but if there is an error in e.g. number of
1606             arguments, they may return an error. When you call exec, all the queued
1607             commands will be executed and exec will return a list of results for every
1608             command in the transaction. If instead of I you call I, all
1609             scheduled commands will be canceled.
1610              
1611             You can set some keys as watched. If any watched key has been changed by
1612             another client before you called exec, the transaction will be discarded and
1613             exec will return false value.
1614              
1615             =cut
1616              
1617             =head2 $self->watch(@keys[, \&callback])
1618              
1619             mark given keys to be watched
1620              
1621             =cut
1622              
1623             sub watch {
1624 0     0 1   my $self = shift;
1625              
1626 0           $self->{_watching} = 1;
1627 0 0         if ( ref $_[-1] eq 'CODE' ) {
1628 0           return $self->send_command( 'WATCH', @_ );
1629             }
1630             else {
1631 0           return $self->execute( 'WATCH', @_ );
1632             }
1633             }
1634              
1635             =head2 $self->unwatch([\&callback])
1636              
1637             unwatch all keys
1638              
1639             =cut
1640              
1641             sub unwatch {
1642 0     0 1   my $self = shift;
1643              
1644 0           my $res;
1645 0 0         if ( ref $_[-1] eq 'CODE' ) {
1646 0           $res = $self->send_command( 'UNWATCH', @_ );
1647             }
1648             else {
1649 0           $res = $self->execute( 'UNWATCH', @_ );
1650             }
1651 0           $self->{_watching} = undef;
1652 0           return $res;
1653             }
1654              
1655             =head2 $self->multi([\&callback])
1656              
1657             Enter the transaction. After this and till I or I will be called,
1658             all the commands will be queued but not executed.
1659              
1660             =cut
1661              
1662             sub multi {
1663 0     0 1   my $self = shift;
1664              
1665 0 0         die "Multi calls can not be nested!" if $self->{_in_multi};
1666 0           $self->{_in_multi} = 1;
1667 0 0         if ( ref $_[-1] eq 'CODE' ) {
1668 0           return $self->send_command( 'MULTI', @_ );
1669             }
1670             else {
1671 0           return $self->execute('MULTI');
1672             }
1673             }
1674              
1675             =head2 $self->exec([\&callback])
1676              
1677             Execute all queued commands and finish the transaction. Returns a list of
1678             results for every command. Will croak if some command has failed. Also
1679             unwatches all the keys. If some of the watched keys has been changed by other
1680             client, the transaction will be canceled and I will return false.
1681              
1682             =cut
1683              
1684             sub exec {
1685 0     0 1   my $self = shift;
1686              
1687 0           my $res;
1688 0 0         if ( ref $_[-1] eq 'CODE' ) {
1689 0           $res = $self->send_command( 'EXEC', @_ );
1690             }
1691             else {
1692 0           $res = $self->execute('EXEC');
1693             }
1694 0           $self->{_in_multi} = undef;
1695 0           $self->{_watching} = undef;
1696 0           return $res;
1697             }
1698              
1699             =head2 $self->discard([\&callback])
1700              
1701             Discard all queued commands without executing them and unwatch all keys.
1702              
1703             =cut
1704              
1705             sub discard {
1706 0     0 1   my $self = shift;
1707              
1708 0           my $res;
1709 0 0         if ( ref $_[-1] eq 'CODE' ) {
1710 0           $res = $self->send_command( 'DISCARD', @_ );
1711             }
1712             else {
1713 0           $res = $self->execute('DISCARD');
1714             }
1715 0           $self->{_in_multi} = undef;
1716 0           $self->{_watching} = undef;
1717 0           return $res;
1718             }
1719              
1720             =head1 CLUSTER SUPPORT
1721              
1722             For accessing redis cluster use L package
1723              
1724             =head1 SENTINEL SUPPORT
1725              
1726             For accessing redis servers managed by sentinel use L package
1727              
1728             =cut
1729              
1730             1;
1731              
1732             __END__