File Coverage

blib/lib/RedisDB.pm
Criterion Covered Total %
statement 226 525 43.0
branch 90 256 35.1
condition 49 120 40.8
subroutine 35 75 46.6
pod 31 32 96.8
total 431 1008 42.7


line stmt bran cond sub pod time code
1             package RedisDB;
2              
3 17     17   798225 use strict;
  17         85  
  17         479  
4 17     17   71 use warnings;
  17         29  
  17         721  
5             our $VERSION = "2.55";
6             $VERSION = eval $VERSION;
7              
8 17     17   5656 use RedisDB::Error;
  17         41  
  17         409  
9 17     17   6444 use RedisDB::Parser;
  17         56700  
  17         487  
10 17     17   8517 use IO::Socket::IP;
  17         468315  
  17         83  
11 17     17   7483 use IO::Socket::UNIX;
  17         38  
  17         163  
12 17     17   10848 use Socket qw(MSG_DONTWAIT MSG_NOSIGNAL SO_RCVTIMEO SO_SNDTIMEO);
  17         39  
  17         1014  
13 17     17   87 use POSIX qw(:errno_h);
  17         32  
  17         76  
14 17     17   5677 use Config;
  17         34  
  17         725  
15 17     17   87 use Carp;
  17         25  
  17         848  
16 17     17   128 use Try::Tiny;
  17         36  
  17         764  
17 17     17   8989 use Encode qw();
  17         139488  
  17         689  
18 17     17   8024 use URI;
  17         65678  
  17         480  
19 17     17   6776 use URI::redis;
  17         138398  
  17         56462  
20              
21             =head1 NAME
22              
23             RedisDB - Perl extension to access redis database
24              
25             =head1 SYNOPSIS
26              
27             use RedisDB;
28              
29             my $redis = RedisDB->new(host => 'localhost', port => 6379);
30             $redis->set($key, $value);
31             my $value = $redis->get($key);
32              
33             =head1 DESCRIPTION
34              
35             This module provides interface to access redis key-value store, it
36             transparently handles disconnects and forks, supports transactions,
37             pipelining, and subscription mode.
38              
39             =head1 METHODS
40              
41             =cut
42              
43             =head2 $class->new(%options)
44              
45             Creates a new RedisDB object. The following options are accepted:
46              
47             =over 4
48              
49             =item host
50              
51             domain name of the host running redis server. Default: "localhost"
52              
53             =item port
54              
55             port to connect. Default: 6379
56              
57             =item path
58              
59             you can connect to redis using UNIX socket. In this case instead of
60             L and L you should specify I.
61              
62             =item password
63              
64             Password, if redis server requires authentication. Alternatively you can use
65             I method after creating the object.
66              
67             =item database
68              
69             DB number to use. Specified database will be selected immediately after
70             connecting to the server. Database changes when you sending I
71             to the server. You can get current database using I method.
72             Default value is 0.
73              
74             =item url
75              
76             A Redis URL as described in L.
77              
78             You cannot use C together with any of C, C, C,
79             C, C.
80              
81             =item raise_error
82              
83             By default if redis-server returned error reply, or there was a connection
84             error I method throws an exception of L type, if you
85             set this parameter to false it will return an error object instead. Note, that
86             if you set this to false you should always check if the result you've got from
87             RedisDB method is a L object.
88              
89             =item timeout
90              
91             IO timeout. With this option set, if I/O operation has taken more than
92             specified number of seconds, module will croak or return
93             L error object depending on L setting.
94             Note, that some OSes do not support SO_RCVTIMEO, and SO_SNDTIMEO socket
95             options, in this case timeout will not work.
96              
97             =item utf8
98              
99             Assume that all data on the server encoded in UTF-8. As result all strings will
100             be converted to UTF-8 before sending to server, and all results will be decoded
101             from UTF-8. See L.
102              
103             =item connection_name
104              
105             After establishing a connection set its name to the specified using "CLIENT
106             SETNAME" command.
107              
108             =item lazy
109              
110             by default I establishes a connection to the server. If this parameter is
111             set, then connection will be established only when you will send first command
112             to the server.
113              
114             =item reconnect_attempts
115              
116             this parameter allows you to specify how many attempts to (re)connect to the
117             server should be made before returning an error. Default value is 1, set to -1
118             if module should try to reconnect indefinitely.
119              
120             =item reconnect_delay_max
121              
122             module waits some time before every new attempt to connect. Delay increases
123             each time. This parameter allows you to specify maximum delay between attempts
124             to reconnect. Default value is 10.
125              
126             =item on_connect_error
127              
128             if module failed to establish connection with the server it will invoke this
129             callback. First argument to the callback is a reference to the RedisDB object,
130             and second is the error description. You must not invoke any methods on the
131             object inside the callback, but you can change I and I, or I
132             attributes of the I object to point to another server. After callback
133             returned, module tries to establish connection again using new parameters. To
134             prevent further connection attempts callback should throw an exception, which
135             is done by default callback. This may be useful to switch to backup server if
136             primary went down. RedisDB distribution includes an example of using this
137             callback in eg/server_failover.pl.
138              
139             =back
140              
141             =cut
142              
143             sub new {
144 36     36 1 3238898 my $class = shift;
145 36 50       1108 my $self = ref $_[0] ? $_[0] : {@_};
146 36         161 bless $self, $class;
147 36 100 100     457 if ( $self->{path} and ( $self->{host} or $self->{port} ) ) {
      100        
148 2         138 croak "You can't specify \"path\" together with \"host\" and \"port\"";
149             }
150 34 100       160 if ( $self->{url} ) {
151 8 100 100     59 if ( $self->{host} or $self->{port} or $self->{path} ) {
      100        
152 3         31 croak "You can't specify \"url\" together with \"host\", \"port\" and \"path\"";
153             }
154              
155 5         19 $self->_parse_url( $self->{url} );
156             }
157 31   100     483 $self->{port} ||= 6379;
158 31   100     249 $self->{host} ||= 'localhost';
159 31 100       218 $self->{raise_error} = 1 unless exists $self->{raise_error};
160 31         312 $self->{_replies} = [];
161 31         181 $self->{_to_be_fetched} = 0;
162 31   100     553 $self->{database} ||= 0;
163 31   100     249 $self->{reconnect_attempts} ||= 1;
164 31   100     173 $self->{reconnect_delay_max} ||= 10;
165 31   100     341 $self->{on_connect_error} ||= \&_on_connect_error;
166 31         249 $self->_init_parser;
167 31 100       2033 $self->_connect unless $self->{lazy};
168 25         152 return $self;
169             }
170              
171             sub _parse_url {
172 5     5   16 my ($self, $url) = @_;
173              
174 5         37 my $uri = URI->new($url);
175              
176 5 50       1540 if ( $uri->scheme !~ /^redis/ ) {
177 0         0 die "Unknown URL scheme '" . $uri->scheme . "' in URL '$url'";
178             }
179              
180 5         251 $self->{host} = $uri->host;
181 5         385 $self->{port} = $uri->port;
182 5         94 $self->{path} = $uri->socket_path;
183 5         57 $self->{password} = $uri->password;
184 5         403 $self->{database} = $uri->database;
185             }
186              
187             sub _is_redisdb_error {
188 37     37   636 ref(shift) =~ /^RedisDB::Error/;
189             }
190              
191             sub _init_parser {
192 83     83   141 my $self = shift;
193             $self->{_parser} = RedisDB::Parser->new(
194             utf8 => $self->{utf8},
195 83         1094 master => $self,
196             error_class => 'RedisDB::Error',
197             );
198             }
199              
200             =head2 $self->execute($command, @arguments)
201              
202             send a command to the server, wait for the result and return it. It will throw
203             an exception if the server returns an error or return L object
204             depending on L parameter. It may be more convenient to use
205             instead of this method wrapper named after the corresponding redis command.
206             E.g.:
207              
208             $redis->execute('set', key => 'value');
209             # is the same as
210             $redis->set(key => 'value');
211              
212             See L section for the full list of defined aliases.
213              
214             Note, that you can not use I if you have sent some commands using
215             I method without the I argument and have not yet got
216             all replies.
217              
218             =cut
219              
220             sub execute {
221 45     45 1 79 my $self = shift;
222 45 50       414 croak "You can't use RedisDB::execute when you have replies to fetch."
223             if $self->replies_to_fetch;
224 45 50       178 croak "This function is not available in subscription mode." if $self->{_subscription_loop};
225 45         174 my $cmd = uc shift;
226 45         291 $self->send_command( $cmd, @_ );
227 42         170 return $self->get_reply;
228             }
229              
230             sub _on_connect_error {
231 14     14   45 my ( $self, $err ) = @_;
232 14   66     159 my $server = $self->{path} || ("$self->{host}:$self->{port}");
233 14         203 my $error_obj =
234             RedisDB::Error::DISCONNECTED->new("Couldn't connect to the redis server at $server: $!");
235 14         264 die $error_obj;
236             }
237              
238             sub _on_disconnect {
239 28     28   91 my ( $self, $err, $error_obj ) = @_;
240              
241 28 100       70 if ($err) {
242 11   66     605 $error_obj ||= RedisDB::Error::DISCONNECTED->new(
243             "Server unexpectedly closed connection. Some data might have been lost.");
244 11 100 66     744 if ( $self->{raise_error} or $self->{_in_multi} or $self->{_watching} ) {
    50 33        
245 7         90 $self->reset_connection;
246 7         55 die $error_obj;
247             }
248             elsif ( my $loop_type = $self->{_subscription_loop} ) {
249 0         0 my $subscribed = delete $self->{_subscribed};
250 0         0 my $psubscribed = delete $self->{_psubscribed};
251 0         0 my $callback = delete $self->{_subscription_cb};
252 0         0 $self->reset_connection;
253              
254             # there's no simple way to return error from here
255             # TODO: handle it
256 0         0 $self->{raise_error}++;
257 0         0 $self->_connect;
258 0         0 $self->{_subscription_loop} = $loop_type;
259 0         0 $self->{_subscription_cb} = $callback;
260 0         0 $self->{_parser}->set_default_callback($callback);
261 0         0 $self->{_subscribed} = $subscribed;
262 0         0 $self->{_psubscribed} = $psubscribed;
263              
264 0         0 for ( keys %$subscribed ) {
265 0         0 $self->send_command( 'subscribe', $_ );
266             }
267 0         0 for ( keys %$psubscribed ) {
268 0         0 $self->send_command( 'psubscribe', $_ );
269             }
270 0         0 $self->{raise_error}--;
271             }
272             else {
273              
274             # parser may be in inconsistent state, so we just replace it with a new one
275 4         36 my $parser = delete $self->{_parser};
276 4         220 delete $self->{_socket};
277 4         112 $parser->propagate_reply($error_obj);
278             }
279             }
280             else {
281 17 50 0     155 $self->{warnings} and warn( $error_obj || "Server closed connection, reconnecting..." );
282             }
283             }
284              
285             # establish connection to the server.
286             # returns undef on success. On failure returns RedisDB::Error or throws an exception.
287             sub _connect {
288 56     56   145 my $self = shift;
289              
290             # this is to prevent recursion
291             confess "Couldn't connect to the redis-server."
292             . " Connection was immediately closed by the server."
293 56 100       301 if $self->{_in_connect};
294              
295 53         442 $self->{_pid} = $$;
296              
297 53         129 delete $self->{_socket};
298 53         66 my $error;
299 53         353 while ( not $self->{_socket} ) {
300 58 100       134 if ( $self->{path} ) {
301             $self->{_socket} = IO::Socket::UNIX->new(
302             Type => SOCK_STREAM,
303             Peer => $self->{path},
304 2 100       45 ) or $error = $!;
305             }
306             else {
307 56         104 my $attempts = $self->{reconnect_attempts};
308 56         56 my $delay;
309 56   100     425 while ( not $self->{_socket} and $attempts ) {
310 86 100       30003673 sleep $delay if $delay;
311             $self->{_socket} = IO::Socket::IP->new(
312             PeerAddr => $self->{host},
313             PeerPort => $self->{port},
314             Proto => 'tcp',
315 86 100       1586 ( $self->{timeout} ? ( Timeout => $self->{timeout} ) : () ),
    100          
316             ) or $error = $!;
317 86 100       71505 $delay = $delay ? ( 1 + rand ) * $delay : 1;
318 86 100       283 $delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};
319 86         418 $attempts--;
320             }
321             }
322             }
323             continue {
324 58 100       913 unless ( $self->{_socket} ) {
325 19         38 my $new_error;
326             try {
327 19     19   2568 $self->{on_connect_error}->( $self, $error );
328             }
329             catch {
330 14 100   14   220 if ( $self->{raise_error} ) {
331 6         43 $self->reset_connection;
332 6         59 die $_;
333             }
334             else {
335 8 50       96 $self->{_parser}->propagate_reply($_) if $self->{_parser};
336 8         36 $new_error = $_;
337             }
338 19         604 };
339 13 100       206 return $new_error if $new_error;
340             }
341             }
342              
343 39 100       121 if ( $self->{timeout} ) {
344 2         48 my $tv_sec = int $self->{timeout};
345 2         20 my $tv_usec = ($self->{timeout} * 1e6) % 1e6;
346 2         18 my $timeout;
347              
348             # NetBSD 6 and OpenBSD 5.5 use 64-bit time_t on all architectures
349             my $timet64;
350 2 50       220 if ( $Config{osname} eq 'netbsd' ) {
    50          
351 0         0 $Config{osvers} =~ /^([0-9]+)/;
352 0 0 0     0 if ( $1 and $1 >= 6 ) {
353 0         0 $timet64 = 1;
354             }
355             }
356             elsif ( $Config{osname} eq 'openbsd' ) {
357 0         0 $Config{osvers} =~ /^([0-9]+)\.([0-9]+)/;
358 0 0 0     0 if ( $1 and ( $1 > 5 or ( $1 == 5 and $2 >= 5 ) ) ) {
      0        
359 0         0 $timet64 = 1;
360             }
361             }
362 2 50 33     80 if ( $timet64 and $Config{longsize} == 4 ) {
363 0 0       0 if ( defined $Config{use64bitint} ) {
364 0         0 $timeout = pack( 'QL', $tv_sec, $tv_usec );
365             }
366             else {
367             $timeout = pack(
368             'LLL',
369             (
370 0 0       0 $Config{byteorder} eq '1234'
371             ? ( $tv_sec, 0, $tv_usec )
372             : ( 0, $tv_sec, $tv_usec )
373             )
374             );
375             }
376             }
377             else {
378 2         40 $timeout = pack( 'L!L!', $tv_sec, $tv_usec);
379             }
380             try {
381 2 50   2   442 defined $self->{_socket}->sockopt( SO_RCVTIMEO, $timeout )
382             or die "Can't set timeout: $!";
383 2 50       68 defined $self->{_socket}->sockopt( SO_SNDTIMEO, $timeout )
384             or die "Can't set send timeout: $!";
385             }
386             catch {
387 0     0   0 warn "$_\n";
388 2         108 };
389             }
390              
391 39         297 $self->{_in_connect}++;
392 39         138 $self->_init_parser;
393 39         1118 $self->{_subscription_loop} = 0;
394 39         68 delete $self->{_server_version};
395              
396             # authenticate
397 39 50       93 if ( $self->{password} ) {
398             $self->send_command(
399             "AUTH",
400             $self->{password},
401             sub {
402 0     0   0 my ( $self, $res ) = @_;
403 0 0       0 croak "$res" if _is_redisdb_error($res);
404             }
405 0         0 );
406             }
407              
408             # connection name
409 39 50       79 if ( $self->{connection_name} ) {
410 0         0 $self->send_command( qw(CLIENT SETNAME), $self->{connection_name}, IGNORE_REPLY() );
411             }
412              
413             # select database
414 39 100       104 if ( $self->{database} ) {
415 3         90 $self->send_command( "SELECT", $self->{database}, IGNORE_REPLY() );
416             }
417              
418 36         61 delete $self->{_in_connect};
419 36         100 return;
420             }
421              
422             my $SET_NB = 0;
423             my $DONTWAIT = 0;
424              
425             # Windows don't have MSG_DONTWAIT, so we need to switch socket into non-blocking mode
426             if ( $^O eq 'MSWin32' ) {
427             $SET_NB = 1;
428             }
429             else {
430             $DONTWAIT = MSG_DONTWAIT;
431             }
432              
433             # parse data from the receive buffer without blocking
434             # Returns undef in case of success or RedisDB::Error if failed
435             sub _recv_data_nb {
436 47     47   75 my $self = shift;
437              
438 47 50       1300 $self->{_socket}->blocking(0) if $SET_NB;
439              
440 47         91 while (1) {
441 47         662 my $ret = recv( $self->{_socket}, my $buf, 131072, $DONTWAIT );
442 47 50       173 unless ( defined $ret ) {
    100          
443              
444             # socket is connected, no data in recv buffer
445 30 50 33     328 last if $! == EAGAIN or $! == EWOULDBLOCK;
446 0 0       0 next if $! == EINTR;
447              
448             # on any other error close the connection
449 0         0 my $error =
450             RedisDB::Error::DISCONNECTED->new("Error reading from server: $!");
451 0         0 $self->_on_disconnect( 1, $error );
452 0         0 return $error;
453             }
454 0         0 elsif ( $buf ne '' ) {
455              
456             # received some data
457 0         0 $self->{_parser}->parse($buf);
458             }
459             else {
460 17         1566 delete $self->{_socket};
461              
462 17 50 33     369 if ( $self->{_parser}->callbacks or $self->{_in_multi} or $self->{_watching} ) {
      33        
463              
464             # there are some replies lost
465 0         0 $self->_on_disconnect(1);
466             }
467             else {
468             # clean disconnect, try to reconnect
469 17         75 $self->_on_disconnect(0);
470             }
471              
472 17 50       49 unless ( $self->{_socket} ) {
473 17         79 my $error = $self->_connect;
474 14 100       70 return $error if $error;
475             }
476 10         25 last;
477             }
478             }
479              
480 40 50       170 $self->{_socket}->blocking(1) if $SET_NB;
481              
482 40         98 return;
483             }
484              
485             sub _queue {
486 37     37   150 my ( $self, $reply ) = @_;
487 37         76 --$self->{_to_be_fetched};
488 37         65 push @{ $self->{_replies} }, $reply;
  37         198  
489             }
490              
491             =head2 $self->send_command($command[, @arguments][, \&callback])
492              
493             send a command to the server. If send has failed command will die or return
494             L object depending on L parameter. Note, that it
495             does not return reply from the server, if I was not specified, you
496             should retrieve result using I method, otherwise I will
497             be invoked upon receiving the result with two arguments: the RedisDB object,
498             and the reply from the server. If the server returned an error, the second
499             argument to the callback will be a L object, you can get
500             description of the error using this object in string context. If you are not
501             interested in reply, you can use RedisDB::IGNORE_REPLY constant as the last
502             argument.
503              
504             Note, that RedisDB does not run any background threads, so it will not receive
505             the reply and invoke the callback unless you call some of its methods which
506             check if there are replies from the server, like I,
507             I, I, or I.
508              
509             =cut
510              
511             my $NOSIGNAL = try { MSG_NOSIGNAL } || 0;
512              
513             sub send_command {
514 54     54 1 871 my $self = shift;
515              
516 54         69 my $callback;
517 54 100       177 if ( ref $_[-1] eq 'CODE' ) {
518 7         54 $callback = pop;
519             }
520             else {
521 47         89 ++$self->{_to_be_fetched};
522 47         102 $callback = \&_queue;
523             }
524              
525 54         112 my $command = uc shift;
526 54 50       132 if ( $self->{_subscription_loop} ) {
527 0 0       0 croak "only (UN)(P)SUBSCRIBE and QUIT allowed in subscription loop"
528             unless $command =~ /^(P?(UN)?SUBSCRIBE|QUIT)$/;
529             }
530              
531             # remember password
532 54 50 33     300 if ( $command eq 'AUTH' ) {
    100          
    50          
533 0         0 $self->{password} = $_[0];
534             }
535              
536             # if SELECT has been successful, we should update database
537             elsif ( $command eq 'SELECT' ) {
538 3         33 my $cb = $callback;
539 3         30 my $dbnum = $_[0];
540             $callback = sub {
541 0 0   0   0 $_[0]->{database} = $dbnum unless ref $_[1];
542 0         0 $cb->(@_);
543 3         78 };
544             }
545              
546             # if CLIENT SETNAME we should remember the name
547             elsif ( $command eq 'CLIENT' && uc $_[0] eq 'SETNAME' ) {
548 0         0 $self->{connection_name} = $_[1];
549             }
550              
551             # if not yet connected to server, or if process was forked
552             # reestablish connection
553 54 100 66     408 unless ( $self->{_socket} and $self->{_pid} == $$ ) {
554 21         174 my $error = $self->_connect;
555 18 100       69 if ($error) {
556 4         36 $callback->( $self, $error );
557 4         8 return $error;
558             }
559             }
560              
561             # Here we are reading received data and parsing it,
562             # and at the same time checking if the connection is still alive
563 47         199 my $error = $self->_recv_data_nb;
564 44 100       117 if ($error) {
565 4         76 $callback->( $self, $error );
566 4         8 return $error;
567             }
568              
569 40         160 $self->{_parser}->push_callback($callback);
570              
571 40         256 my $request = $self->{_parser}->build_request( $command, @_ );
572             {
573 40 50       61 local $SIG{PIPE} = 'IGNORE' unless $NOSIGNAL;
  40         129  
574 40 50       4919 defined send( $self->{_socket}, $request, $NOSIGNAL )
575             or $self->_on_disconnect( 1,
576             RedisDB::Error::DISCONNECTED->new("Can't send request to server: $!") );
577             }
578              
579 40         394 return 1;
580             }
581              
582             sub _ignore {
583 0     0   0 my ( $self, $res ) = @_;
584 0 0       0 if ( _is_redisdb_error($res) ) {
585 0         0 warn "Ignoring error returned by redis-server: $res";
586             }
587             }
588              
589 3     3 0 78 sub IGNORE_REPLY { return \&_ignore; }
590              
591             =begin comment
592              
593             =head2 $self->send_command_cb($command[, @arguments][, \&callback])
594              
595             send a command to the server, invoke specified I on reply. The
596             callback is invoked with two arguments: the RedisDB object, and reply from the
597             server. If the server returned an error, the second argument will be a
598             L object, you can get description of the error using this
599             object in string context. If the I is not specified, the reply will
600             be discarded. Note, that RedisDB does not run any background threads, so it
601             will not receive the reply and invoke the callback unless you call some of its
602             methods which check if there are replies from the server, like I,
603             I, I, I, or I.
604              
605             B this method is deprecated and may be removed in some future
606             version. Please use I method instead. If you are using
607             I with I<&callback> argument, you can just replace the method
608             with I and it will do the same. If you are using
609             I with the default callback, you should add the
610             RedisDB::IGNORE_REPLY constant as the last argument when replacing the method
611             with I. Here is the example that shows equivalents with
612             I:
613              
614             $redis->send_command_cb("SET", "Key", "Value");
615             # may be replaced with
616             $redis->send_command("SET", "Key", "Value", RedisDB::IGNORE_REPLY);
617              
618             $redis->send_command_cb("GET", "Key", \&process_reply);
619             # may be replaced with
620             $redis->send_command("GET", "Key", \&process_reply);
621              
622             =end comment
623              
624             =cut
625              
626             sub send_command_cb {
627 0     0 1 0 my $self = shift;
628 0 0       0 my $callback = pop if ref $_[-1] eq 'CODE';
629 0   0     0 $callback ||= \&_ignore;
630 0         0 return $self->send_command( @_, $callback );
631             }
632              
633             =head2 $self->reply_ready
634              
635             this method may be used in the pipelining mode to check if there are some
636             replies already received from the server. Returns true if there are replies
637             ready to be fetched with I method.
638              
639             =cut
640              
641             sub reply_ready {
642 0     0 1 0 my $self = shift;
643              
644 0         0 my $error = $self->_recv_data_nb;
645 0 0       0 if ($error) {
646 0         0 $self->_on_disconnect( 1, $error );
647             }
648 0 0       0 return @{ $self->{_replies} } ? 1 : 0;
  0         0  
649             }
650              
651             =head2 $self->mainloop
652              
653             this method blocks till all replies from the server will be received. Note,
654             that callbacks for some replies may send new requests to the server and so this
655             method may block for indefinite time.
656              
657             =cut
658              
659             sub mainloop {
660 0     0 1 0 my $self = shift;
661              
662 0 0       0 return unless $self->{_parser};
663              
664 0         0 while ( $self->{_parser}->callbacks ) {
665 0 0       0 croak "You can't call mainloop in the child process" unless $self->{_pid} == $$;
666 0         0 my $ret = recv( $self->{_socket}, my $buffer, 131073, 0 );
667 0 0       0 unless ( defined $ret ) {
668 0 0       0 next if $! == EINTR;
669 0 0       0 if ( $! == EAGAIN ) {
670 0         0 confess "Timed out waiting reply from the server";
671             }
672             else {
673 0         0 $self->_on_disconnect( 1,
674             RedisDB::Error::DISCONNECTED->new("Error reading reply from the server: $!") );
675 0         0 next;
676             }
677             }
678 0 0       0 if ( $buffer ne '' ) {
679              
680             # received some data
681 0         0 $self->{_parser}->parse($buffer);
682             }
683             else {
684              
685             # disconnected
686 0         0 $self->_on_disconnect(
687             1,
688             RedisDB::Error::DISCONNECTED->new(
689             "Server unexpectedly closed connection before sending full reply")
690             );
691             }
692             }
693 0         0 return;
694             }
695              
696             =head2 $self->get_reply
697              
698             receive and return reply from the server. If the server returned an error,
699             method throws L exception or returns L object,
700             depending on the L parameter.
701              
702             =cut
703              
704             sub get_reply {
705 44     44 1 3181 my $self = shift;
706              
707             croak "We are not waiting for reply"
708 44         308 unless @{ $self->{_replies} }
709             or $self->{_to_be_fetched}
710 44 0 66     47 or $self->{_subscription_loop};
      33        
711 44 50       183 croak "You can't read reply in child process" unless $self->{_pid} == $$;
712 44         55 while ( not @{ $self->{_replies} } ) {
  82         255  
713 45         6056554 my $ret = recv( $self->{_socket}, my $buffer, 131074, 0 );
714 45 100       342 if ( not defined $ret ) {
    100          
715 2 50 33     68 next if $! == EINTR or $! == 0;
716 2         24 my $err;
717 2 50 33     42 if ( $! == EAGAIN or $! == EWOULDBLOCK ) {
718 2         116 $err = RedisDB::Error::EAGAIN->new("$!");
719             }
720             else {
721 0         0 $err = RedisDB::Error::DISCONNECTED->new("Connection error: $!");
722             }
723 2         106 $self->_on_disconnect( 1, $err );
724             }
725             elsif ( $buffer ne '' ) {
726              
727             # received some data
728 34         297 $self->{_parser}->parse($buffer);
729             }
730             else {
731              
732             # disconnected, should die unless raise_error is unset
733 9         36 $self->_on_disconnect(1);
734             }
735             }
736              
737 37         48 my $res = shift @{ $self->{_replies} };
  37         81  
738 37 50 33     126 if ( _is_redisdb_error($res)
      66        
739             and ( $self->{raise_error} or $self->{_in_multi} or $self->{_watching} ) )
740             {
741 0         0 croak $res;
742             }
743              
744 37 50       113 if ( $self->{_subscription_loop} ) {
745 0 0       0 confess "Expected multi-bulk reply, but got $res" unless ref $res;
746 0 0       0 if ( $res->[0] eq 'message' ) {
    0          
    0          
747             $self->{_subscribed}{ $res->[1] }( $self, $res->[1], undef, $res->[2] )
748 0 0       0 if $self->{_subscribed}{ $res->[1] };
749             }
750             elsif ( $res->[0] eq 'pmessage' ) {
751             $self->{_psubscribed}{ $res->[1] }( $self, $res->[2], $res->[1], $res->[3] )
752 0 0       0 if $self->{_psubscribed}{ $res->[1] };
753             }
754             elsif ( $res->[0] =~ /^p?(un)?subscribe/ ) {
755              
756             # ignore
757             }
758             else {
759 0         0 confess "Got unknown reply $res->[0] in subscription mode";
760             }
761             }
762              
763 37         343 return $res;
764             }
765              
766             =head2 $self->get_all_replies
767              
768             wait till replies to all the commands without callback set will be received.
769             Returns a list of replies to these commands. For commands with callback set
770             replies are processed as usual. Unlike I this method blocks only till
771             replies to all commands for which callback was NOT set will be received.
772              
773             =cut
774              
775             sub get_all_replies {
776 0     0 1 0 my $self = shift;
777 0         0 my @res;
778 0         0 while ( $self->replies_to_fetch ) {
779 0         0 push @res, $self->get_reply;
780             }
781 0         0 return @res;
782             }
783              
784             =head2 $self->replies_to_fetch
785              
786             return the number of commands sent to the server replies to which were not yet
787             retrieved with I or I. This number only includes
788             commands for which callback was not set.
789              
790             =cut
791              
792             sub replies_to_fetch {
793 45     45 1 93 my $self = shift;
794 45         81 return $self->{_to_be_fetched} + @{ $self->{_replies} };
  45         172  
795             }
796              
797             =head2 $self->selected_database
798              
799             get currently selected database.
800              
801             =cut
802              
803             sub selected_database {
804 0     0 1 0 shift->{database};
805             }
806              
807             =head2 $self->reset_connection
808              
809             reset connection. This method closes existing connection and drops all
810             previously sent requests. After invoking this method the object returns to the
811             same state as it was returned by the constructor.
812              
813             =cut
814              
815             sub reset_connection {
816 13     13 1 40 my $self = shift;
817 13         824 delete $self->{$_} for grep /^_/, keys %$self;
818 13         56 $self->{_replies} = [];
819 13         60 $self->_init_parser;
820 13         256 $self->{_to_be_fetched} = 0;
821 13         27 return;
822             }
823              
824             =head2 $self->version
825              
826             return the version of the server the client is connected to. The version is
827             returned as a floating point number represented the same way as the perl
828             versions. E.g. for redis 2.1.12 it will return 2.001012.
829              
830             =cut
831              
832             sub version {
833 0     0 1 0 my $self = shift;
834 0         0 my $info = $self->info;
835 0 0       0 $info->{redis_version} =~ /^([0-9]+)[.]([0-9]+)(?:[.]([0-9]+))?/
836             or croak "Can't parse version string: $info->{redis_version}";
837 0 0       0 $self->{_server_version} = $1 + 0.001 * $2 + ( $3 ? 0.000001 * $3 : 0 );
838 0         0 return $self->{_server_version};
839             }
840              
841             # don't forget to update POD
842             my @commands = qw(
843             append asking auth bgrewriteaof bgsave bitcount bitop bitpos
844             blpop brpop brpoplpush client client_kill client_getname client_setname
845             cluster command
846             config config_get config_set config_resetstat config_rewrite
847             dbsize debug_error debug_object debug_segfault
848             decr decrby del dump echo eval evalsha exists expire expireat flushall
849             flushdb geoadd geodist geohash geopos georadius georadiusbymember
850             get getbit getrange getset hdel hexists hget hgetall
851             hincrby hincrbyfloat hkeys hlen hmget hscan hmset hset hsetnx hvals incr incrby
852             incrbyfloat keys lastsave lindex linsert llen lpop lpush lpushx
853             lrange lrem lset ltrim mget migrate move mset msetnx object object_refcount
854             object_encoding object_idletime persist pexpire pexpireat pfadd pfcount pfmerge ping psetex pttl
855             pubsub pubsub_channels pubsub_numsub pubsub_numpat
856             publish quit randomkey rename renamenx restore rpop rpoplpush
857             rpush rpushx sadd save scan scard script script_exists script_flush script_kill
858             script_load sdiff sdiffstore select set
859             setbit setex setnx setrange sinter sinterstore
860             sismember slaveof slowlog smembers smove sort spop srandmember
861             srem sscan strlen sunion sunionstore time ttl type
862             zadd zcard zcount zincrby zinterstore zlexcount zrange zrangebylex
863             zrangebyscore zrank zrem zremrangebylex
864             zremrangebyrank zremrangebyscore zrevrange zrevrangebyscore zrevrank
865             zscan zscore zunionstore
866             );
867              
868             sub _simple_commands {
869 0     0   0 return @commands;
870             }
871              
872             =head1 WRAPPER METHODS
873              
874             Instead of using I and I methods directly, it may be
875             more convenient to use wrapper methods with names matching names of the redis
876             commands. These methods call I or I depending on the
877             presence of the callback argument. If callback is specified, the method invokes
878             I and returns as soon as the command has been sent to the server;
879             when the reply is received, it will be passed to the callback (see
880             L). If there is no callback, the method invokes
881             I, waits for the reply from the server, and returns it. E.g.:
882              
883             $val = $redis->get($key);
884             # equivalent to
885             $val = $redis->execute("get", $key);
886              
887             $redis->get($key, sub { $val = $_[1] });
888             # equivalent to
889             $redis->send_command("get", $key, sub { $val = $_[1] });
890              
891             The following wrapper methods are defined: append, asking, auth, bgrewriteaof, bgsave,
892             bitcount, bitop, bitpos, blpop, brpop, brpoplpush, client, client_kill,
893             client_getname, client_setname, cluster, command, config, config_get, config_set,
894             config_resetstat, config_rewrite, dbsize, debug_error, debug_object, debug_segfault, decr,
895             decrby, del, dump, echo, eval, evalsha, exists, expire, expireat, flushall,
896             flushdb, geoadd, geodist, geohash, geopos, georadius, georadiusbymember,
897             get, getbit, getrange, getset, hdel, hexists, hget, hgetall, hincrby,
898             hincrbyfloat, hkeys, hlen, hmget, hscan, hmset, hset, hsetnx, hvals, incr,
899             incrby, incrbyfloat, keys, lastsave, lindex, linsert, llen, lpop, lpush,
900             lpushx, lrange, lrem, lset, ltrim, mget, migrate, move, mset, msetnx, object,
901             object_refcount, object_encoding, object_idletime, persist, pexpire, pexpireat,
902             pfadd, pfcount, pfmerge, ping, psetex, pttl, publish, pubsub, pubsub_channels, pubsub_numsub,
903             pubsub_numpat, quit, randomkey, rename, renamenx, restore, rpop, rpoplpush,
904             rpush, rpushx, sadd, save, scan, scard, script, script_exists, script_flush,
905             script_kill, script_load, sdiff, sdiffstore, select, set, setbit, setex, setnx,
906             setrange, sinter, sinterstore, sismember, slaveof, slowlog, smembers, smove,
907             sort, spop, srandmember, srem, sscan strlen, sunion, sunionstore, time,
908             ttl, type, unwatch, watch, zadd, zcard, zcount, zincrby, zinterstore,
909             zlexcount, zrange, zrangebylex, zrangebyscore, zrank, zrem, zremrangebylex,
910             zremrangebyrank, zremrangebyscore, zrevrange, zrevrangebyscore, zrevrank,
911             zscan, zscore, zunionstore.
912              
913             See description of all commands in redis documentation at
914             L.
915              
916             =cut
917              
918             for my $command (@commands) {
919             my @uccom = split /_/, uc $command;
920 17     17   169 no strict 'refs';
  17         25  
  17         20129  
921             *{ __PACKAGE__ . "::$command" } = sub {
922 49     49   1036013 my $self = shift;
923 49 100       223 if ( ref $_[-1] eq 'CODE' ) {
924 4         216 return $self->send_command( @uccom, @_ );
925             }
926             else {
927 45         353 return $self->execute( @uccom, @_ );
928             }
929             };
930             }
931              
932             =pod
933              
934             The following commands implement some additional postprocessing of the results:
935              
936             =cut
937              
938             sub _execute_with_postprocess {
939 0     0     my $self = shift;
940 0           my $ppsub = pop;
941 0 0 0       if ( $_[-1] && ref $_[-1] eq 'CODE' ) {
942 0           my $orig = pop;
943             my $cb = sub {
944 0     0     my ( $redis, $reply ) = @_;
945 0 0         $reply = $ppsub->($reply) unless _is_redisdb_error($reply);
946 0           $orig->( $redis, $reply );
947 0           };
948 0           return $self->send_command( @_, $cb );
949             }
950             else {
951 0           my $reply = $self->execute(@_);
952 0 0         $reply = $ppsub->($reply) unless _is_redisdb_error($reply);
953 0           return $reply;
954             }
955             }
956              
957             =head2 $self->info([\&callback])
958              
959             return information and statistics about the server. Redis-server returns
960             information in form of I, the I method parses result and
961             returns it as a hash reference.
962              
963             =cut
964              
965             sub info {
966 0     0 1   my $self = shift;
967 0           return $self->_execute_with_postprocess('INFO', @_, \&_parse_info);
968             }
969              
970             sub _parse_info {
971 0     0     my $info = shift;
972 0 0 0       return $info if !$info || ref $info;
973 0           my %info = map { /^([^:]+):(.*)$/ } split /\r\n/, $info;
  0            
974 0           return \%info;
975             }
976              
977             =head2 $self->client_list([\&callback])
978              
979             return list of clients connected to the server. This method parses server
980             output and returns result as reference to array of hashes.
981              
982             =cut
983              
984             sub client_list {
985 0     0 1   my $self = shift;
986 0           return $self->_execute_with_postprocess('CLIENT', 'LIST', @_, \&_parse_client_list);
987             }
988              
989             sub _parse_client_list {
990 0     0     my $list = shift;
991 0 0 0       return $list if !$list || ref $list;
992 0           my @clients = split /\015?\012/, $list;
993 0           my $res = [];
994 0           for (@clients) {
995 0 0         my %cli = map { /^([^=]+)=(.*)$/ ? ( $1, $2 ) : () } split / /;
  0            
996 0           push @$res, \%cli;
997             }
998 0           return $res;
999             }
1000              
1001             =head2 $self->cluster_info([\&callback])
1002              
1003             return information and statistics about the cluster. Redis-server returns
1004             information in form of I, the I method parses result
1005             and returns it as a hash reference.
1006              
1007             =cut
1008              
1009             sub cluster_info {
1010 0     0 1   my $self = shift;
1011 0           return $self->_execute_with_postprocess('CLUSTER', 'INFO', @_, \&_parse_info);
1012             }
1013              
1014             =head2 $self->cluster_nodes([\&callback])
1015              
1016             return list of cluster nodes. Each node represented as a hash with the
1017             following keys: node_id, address, host, port, flags, master_id, last_ping_sent,
1018             last_pong_received, link_state, slots.
1019              
1020             =cut
1021              
1022             sub cluster_nodes {
1023 0     0 1   my $self = shift;
1024             return $self->_execute_with_postprocess( 'CLUSTER', 'NODES', @_,
1025 0     0     sub { $self->_parse_cluster_nodes(@_) } );
  0            
1026             }
1027              
1028             sub _parse_cluster_nodes {
1029 0     0     my ($self, $list) = @_;
1030              
1031 0           my @nodes;
1032 0           for ( split /^/, $list ) {
1033 0           my ( $node_id, $addr, $flags, $master_id, $ping, $pong, $state, @slots ) =
1034             split / /;
1035 0           my %flags = map { $_ => 1 } split /,/, $flags;
  0            
1036 0           my ( $host_port ) = split /@/, $addr;
1037 0           my ( $host, $port ) = split /:([^:]+)$/, $host_port;
1038 0 0         unless ($host) {
1039 0           $host = $self->{host}, $addr = "$self->{host}:$port",
1040             }
1041 0           my $node = {
1042             node_id => $node_id,
1043             address => $addr,
1044             host => $host,
1045             port => $port,
1046             flags => \%flags,
1047             master_id => $master_id,
1048             last_ping_sent => $ping,
1049             last_pong_received => $pong,
1050             link_state => $state,
1051             slots => \@slots,
1052             };
1053 0           push @nodes, $node;
1054             }
1055              
1056 0           return \@nodes;
1057             }
1058              
1059             sub _parse_role {
1060 0     0     my $role = shift;
1061              
1062 0           my $parsed = {
1063             role => $role->[0],
1064             };
1065 0 0         if ( $parsed->{role} eq 'master' ) {
    0          
    0          
1066 0           $parsed->{replication_offset} = $role->[1];
1067 0           for ( @{ $role->[2] } ) {
  0            
1068 0           push @{ $parsed->{slaves} },
  0            
1069             {
1070             host => $_->[0],
1071             port => $_->[1],
1072             replication_offset => $_->[2],
1073             };
1074             }
1075             }
1076             elsif ( $parsed->{role} eq 'slave' ) {
1077             $parsed->{master} = {
1078 0           host => $role->[1],
1079             port => $role->[2],
1080             };
1081 0           $parsed->{status} = $role->[3];
1082 0           $parsed->{replication_offset} = $role->[4];
1083             }
1084             elsif ( $parsed->{role} eq 'sentinel' ) {
1085 0           for ( @{ $role->[1] } ) {
  0            
1086 0           push @{ $parsed->{services} }, $_;
  0            
1087             }
1088             } else {
1089 0           confess "Unknown role $parsed->{role}";
1090             }
1091              
1092 0           return $parsed;
1093             }
1094              
1095             =head2 $self->role([\&callback])
1096              
1097             return reference to a hash describing the role of the server. Hash contains
1098             "role" element that can be either "master", "slave", or "sentinel". For master
1099             hash will also contain "replication_offset" and "slaves" elements, for slave it
1100             will contain "master", "status", and "replication_offset" elements, and for
1101             sentinel it will contain "services".
1102              
1103             =cut
1104              
1105             sub role {
1106 0     0 1   my $self = shift;
1107 0           return $self->_execute_with_postprocess( 'ROLE', @_, \&_parse_role );
1108             }
1109              
1110             =head2 $self->shutdown
1111              
1112             Shuts the redis server down. Returns undef, as the server doesn't send the
1113             answer. Croaks in case of the error.
1114              
1115             =cut
1116              
1117             sub shutdown {
1118 0     0 1   my $self = shift;
1119 0           $self->send_command_cb( 'SHUTDOWN', @_ );
1120 0           return;
1121             }
1122              
1123             =head2 $self->scan_all([MATCH => $pattern,][COUNT => $count,])
1124              
1125             this method starts a new SCAN iteration and executes SCAN commands till cursor
1126             returned by server is 0. It then returns all the keys returned by server during
1127             the iteration. MATCH and COUNT are passed to SCAN command. In case of success
1128             returns reference to array with matching keys, in case of error dies or returns
1129             L object depending on I option.
1130              
1131             =cut
1132              
1133             sub scan_all {
1134 0     0 1   my $self = shift;
1135 0 0         if ( ref $_[-1] eq 'CODE' ) {
1136 0           croak "scan_all does not accept callback parameter";
1137             }
1138 0           my $cursor = 0;
1139 0           my @result;
1140 0           do {
1141 0           my $res = $self->execute( 'SCAN', $cursor, @_ );
1142              
1143             # in case of error just return it
1144 0 0         return $res unless ref $res eq 'ARRAY';
1145 0           $cursor = $res->[0];
1146 0           push @result, @{ $res->[1] };
  0            
1147             } while $cursor;
1148 0           return \@result;
1149             }
1150              
1151             =head2 $self->hscan_all($key, [MATCH => $pattern,][COUNT => $count,])
1152              
1153             =head2 $self->sscan_all($key, [MATCH => $pattern,][COUNT => $count,])
1154              
1155             =head2 $self->zscan_all($key, [MATCH => $pattern,][COUNT => $count,])
1156              
1157             these three methods are doing the same thing as I except that they
1158             require a key as the first parameter, and they iterate using HSCAN, SSCAN and
1159             ZSCAN commands.
1160              
1161             =cut
1162              
1163             for my $command (qw(hscan sscan zscan)) {
1164             my $uccom = uc $command;
1165 17     17   123 no strict 'refs';
  17         35  
  17         22608  
1166             my $name = "${command}_all";
1167             *{ __PACKAGE__ . "::$name" } = sub {
1168 0     0     my $self = shift;
1169 0           my $key = shift;
1170 0 0         if ( ref $_[-1] eq 'CODE' ) {
1171 0           croak "$name does not accept callback parameter";
1172             }
1173 0           my $cursor = 0;
1174 0           my @result;
1175 0           do {
1176 0           my $res = $self->execute( $uccom, $key, $cursor, @_ );
1177 0 0         return $res unless ref $res eq 'ARRAY';
1178 0           $cursor = $res->[0];
1179 0           push @result, @{ $res->[1] };
  0            
1180             } while $cursor;
1181 0           return \@result;
1182             };
1183             }
1184              
1185             =head1 UTF-8 SUPPORT
1186              
1187             The redis protocol is designed to work with the binary data, both keys and
1188             values are encoded in the same way as sequences of octets. By default this
1189             module expects all data to be just strings of bytes. There is an option to
1190             treat all data as UTF-8 strings. If you pass I parameter to the
1191             constructor, module will encode all strings to UTF-8 before sending them to
1192             server, and will decode all strings received from server from UTF-8. This has
1193             following repercussions you should be aware off: first, you can't store binary
1194             data on server with this option on, it would be treated as a sequence of latin1
1195             characters, and would be converted into a corresponding sequence of UTF-8
1196             encoded characters; second, if data returned by the server is not a valid UTF-8
1197             encoded string, the module will croak, and you will have to reinitialize the
1198             connection. The parser only checks for invalid UTF-8 byte sequences, it doesn't
1199             check if input contains invalid code points. Generally, using this option is
1200             not recommended.
1201              
1202             =cut
1203              
1204             =head1 ERROR HANDLING
1205              
1206             If L parameter was set to true in the constructor (which is
1207             default setting), then module will throw an exception in case network IO
1208             function returned an error, or if redis-server returned an error reply. Network
1209             exceptions belong to L or
1210             L class, if redis-server returned an error
1211             exception will be of L class. If the object was in subscription
1212             mode, you will have to restore all the subscriptions. If the object was in the
1213             middle of transaction, when after network error you will have to start the
1214             transaction again.
1215              
1216             If L parameter was disabled, then instead of throwing an
1217             exception, module will return exception object and also pass this exception
1218             object to every callback waiting for the reply from the server. If the object
1219             is in subscription mode, then module will automatically restore all
1220             subscriptions after reconnect. Note, that during transaction L is
1221             always enabled, so any error will throw an exception.
1222              
1223             =cut
1224              
1225             =head1 HANDLING OF SERVER DISCONNECTS
1226              
1227             Redis server may close a connection if it was idle for some time, also the
1228             connection may be closed in case when redis-server was restarted, or just
1229             because of the network problem. RedisDB always tries to restore connection to
1230             the server if no data has been lost as a result of disconnect, and if
1231             L parameter disabled it will try to reconnect even if disconnect
1232             happened during data transmission. E.g. if the client was idle for some time
1233             and the redis server closed the connection, it will be transparently restored
1234             when you send a command next time no matter if L enabled or not.
1235             If you sent a command and the server has closed the connection without sending
1236             a complete reply, then module will act differently depending on L
1237             value. If L enabled, the module will cancel all current
1238             callbacks, reset the object to the initial state, and throw an exception of
1239             L class, next time you use the object it will
1240             establish a new connection. If L disabled, the module will pass
1241             L object to all outstanding callbacks and will
1242             try to reconnect to the server; it will also automatically restore
1243             subscriptions if object was in subscription mode. Module never tries to
1244             reconnect after MULTI or WATCH command was sent to server and before
1245             corresponding UNWATCH, EXEC or DISCARD was sent as this may cause data
1246             corruption, so during transaction module behaves like if L is
1247             set.
1248              
1249             Module makes several attempts to reconnect each time increasing interval before
1250             the next attempt, depending on the values of L and
1251             L. After each failed attempt to connect module will
1252             invoke L callback which for example may change redis-server
1253             hostname, so on next attempt module will try to connect to different server.
1254              
1255             =cut
1256              
1257             =head1 PIPELINING
1258              
1259             You can send commands in the pipelining mode. It means you are sending multiple
1260             commands to the server without waiting for the replies. This is implemented by
1261             the I method. Recommended way of using it is to pass a reference
1262             to the callback function as the last argument. When module receives reply from
1263             the server, it will call this function with two arguments: reference to the
1264             RedisDB object, and reply from the server. It is important to understand
1265             though, that RedisDB does not run any background threads, neither it checks for
1266             the replies by setting some timer, so e.g. in the following example callback
1267             will never be invoked:
1268              
1269             my $pong;
1270             $redis->send_command( "ping", sub { $pong = $_[1] } );
1271             sleep 1 while not $pong; # this will never return
1272              
1273             Therefore you need periodically trigger check for the replies. The check is
1274             triggered when you call the following methods: I, I,
1275             I, I. Calling wrapper method, like
1276             C<< $redis->get('key') >>, will also trigger check as internally wrapper methods
1277             use methods listed above.
1278              
1279             Also you can omit callback argument when invoke I. In this case
1280             you have to fetch reply later explicitly using I method. This is how
1281             synchronous I is implemented, basically it is:
1282              
1283             sub execute {
1284             my $self = shift;
1285             $self->send_command(@_);
1286             return $self->get_reply;
1287             }
1288              
1289             That is why it is not allowed to call I unless you have got replies to
1290             all commands sent previously with I without callback. Using
1291             I without callback is not recommended.
1292              
1293             Sometimes you are not interested in replies sent by the server, e.g. SET
1294             command usually just return 'OK', in this case you can pass to I
1295             callback which ignores its arguments, or use C constant, it
1296             is a no-op function:
1297              
1298             for (@keys) {
1299             # execute will not just send 'GET' command to the server,
1300             # but it will also receive response to the 'SET' command sent on
1301             # the previous loop iteration
1302             my $val = $redis->execute( "get", $_ );
1303             $redis->send_command( "set", $_, fun($val), RedisDB::IGNORE_REPLY );
1304             }
1305             # and this will wait for the last reply
1306             $redis->mainloop;
1307              
1308             or using L you can rewrite it as:
1309              
1310             for (@keys) {
1311             my $val = $redis->get($_);
1312             $redis->set( $_, fun($val), RedisDB::IGNORE_REPLY );
1313             }
1314             $redis->mainloop;
1315              
1316             =cut
1317              
1318             =head1 PUB/SUB MESSAGING
1319              
1320             RedisDB supports subscriptions to redis channels. In the subscription mode you
1321             can subscribe to some channels and receive all the messages sent to these
1322             channels. You can subscribe to channels and then manually check messages using
1323             I method, or you can invoke I method, which will
1324             block in loop waiting for messages and invoking callback for each received
1325             message. In the first case you can use I and I methods
1326             to subscribe to channels and then you can use I method to get
1327             messages from the channel:
1328              
1329             $redis->subscribe(
1330             foo => sub {
1331             my ( $redis, $channel, $patern, $message ) = @_;
1332             print "Foo: $message\n";
1333             }
1334             );
1335             # Wait for messages
1336             $res = $redis->get_reply;
1337              
1338             I method for messages from the channel will invoke callback
1339             specified as the second optional argument of the I method and will
1340             also return raw replies from the server, both for messages from the channels
1341             and for informational messages from the redis server. If you do not want to
1342             block in I method, you can check if there are any messages using
1343             I method.
1344              
1345             In the second case you invoke I method, it subscribes to
1346             specified channels and waits for messages, when a message arrived it invokes
1347             callback defined for the channel from which the message came. Here is an
1348             example:
1349              
1350             my $message_cb = sub {
1351             my ( $redis, $channel, $pattern, $message ) = @_;
1352             print "$channel: $message\n";
1353             };
1354              
1355             my $control_cb = sub {
1356             my ( $redis, $channel, $pattern, $message ) = @_;
1357             if ( $channel eq 'control.quit' ) {
1358             $redis->unsubscribe;
1359             $redis->punsubscribe;
1360             }
1361             elsif ( $channel eq 'control.subscribe' ) {
1362             $redis->subscribe($message);
1363             }
1364             };
1365              
1366             $redis->subscription_loop(
1367             subscribe => [ 'news', ],
1368             psubscribe => [ 'control.*' => $control_cb ],
1369             default_callback => $message_cb,
1370             );
1371              
1372             subscription_loop will subscribe you to the "news" channel and "control.*"
1373             channels. It will call specified callbacks every time a new message received.
1374             When message came from "control.subscribe" channel, callback subscribes to an
1375             additional channel. When message came from "control.quit" channel, callback
1376             unsubscribes from all channels.
1377              
1378             Callbacks used in subscription mode receive four arguments: the RedisDB object,
1379             the channel from which the message came, the pattern if you subscribed to this
1380             channel using I method, and the message itself.
1381              
1382             Once you switched into subscription mode using either I or
1383             I command, or by entering I, you only can send
1384             I, I, I, and I commands to
1385             the server, other commands will throw an exception.
1386              
1387             You can publish messages into the channels using the I method. This
1388             method should be called when you in the normal mode, and can't be used while
1389             you're in the subscription mode.
1390              
1391             Following methods can be used in subscription mode:
1392              
1393             =cut
1394              
1395             =head2 $self->subscription_loop(%parameters)
1396              
1397             Enter into the subscription mode. The method subscribes you to the specified
1398             channels, waits for the messages, and invokes the appropriate callback for
1399             every received message. The method returns after you unsubscribed from all the
1400             channels. It accepts the following parameters:
1401              
1402             =over 4
1403              
1404             =item default_callback
1405              
1406             reference to the default callback. This callback is invoked for a message if you
1407             didn't specify other callback for the channel this message comes from.
1408              
1409             =item subscribe
1410              
1411             an array reference. Contains the list of channels you want to subscribe. A
1412             channel name may be optionally followed by the reference to a callback function
1413             for this channel. E.g.:
1414              
1415             [ 'news', 'messages', 'errors' => \&error_cb, 'other' ]
1416              
1417             channels "news", "messages", and "other" will use default callback, but for
1418             the "errors" channel error_cb function will be used.
1419              
1420             =item psubscribe
1421              
1422             same as subscribe, but you specify patterns for channels' names.
1423              
1424             =back
1425              
1426             All parameters are optional, but you must subscribe at least to one channel. Also
1427             if default_callback is not specified, you have to explicitly specify a callback
1428             for every channel you are going to subscribe.
1429              
1430             =cut
1431              
1432             sub subscription_loop {
1433 0     0 1   my ( $self, %args ) = @_;
1434 0 0         croak "Already in subscription loop" if $self->{_subscription_loop} > 0;
1435 0 0         croak "You can't start subscription loop while in pipelining mode."
1436             if $self->replies_to_fetch;
1437 0   0       $self->{_subscribed} ||= {};
1438 0   0       $self->{_psubscribed} ||= {};
1439 0           $self->{_subscription_cb} = $args{default_callback};
1440 0           $self->{_subscription_loop} = 1;
1441 0           $self->{_parser}->set_default_callback( \&_queue );
1442              
1443 0 0         if ( $args{subscribe} ) {
1444 0           while ( my $channel = shift @{ $args{subscribe} } ) {
  0            
1445 0           my $cb;
1446 0 0         $cb = shift @{ $args{subscribe} } if ref $args{subscribe}[0] eq 'CODE';
  0            
1447 0           $self->subscribe( $channel, $cb );
1448             }
1449             }
1450 0 0         if ( $args{psubscribe} ) {
1451 0           while ( my $channel = shift @{ $args{psubscribe} } ) {
  0            
1452 0           my $cb;
1453 0 0         $cb = shift @{ $args{psubscribe} } if ref $args{psubscribe}[0] eq 'CODE';
  0            
1454 0           $self->psubscribe( $channel, $cb );
1455             }
1456             }
1457             croak "You must subscribe at least to one channel"
1458 0 0 0       unless ( keys %{ $self->{_subscribed} } or keys %{ $self->{_psubscribed} } );
  0            
  0            
1459              
1460 0           while ( $self->{_subscription_loop} ) {
1461 0           $self->get_reply;
1462             }
1463 0           return;
1464             }
1465              
1466             =head2 $self->subscribe($channel[, \&callback])
1467              
1468             Subscribe to the I<$channel>. If I<$callback> is not specified, default
1469             callback will be used in subscription loop, or messages will be returned by
1470             I if you are not using subscription loop.
1471              
1472             =cut
1473              
1474             sub subscribe {
1475 0     0 1   my ( $self, $channel, $callback ) = @_;
1476 0 0         unless ( $self->{_subscription_loop} ) {
1477 0           $self->{_subscription_loop} = -1;
1478 0           $self->{_subscription_cb} = \&_queue;
1479 0           $self->{_parser}->set_default_callback( \&_queue );
1480             }
1481 0 0         croak "Subscribe to what channel?" unless length $channel;
1482 0 0         if ( $self->{_subscription_loop} > 0 ) {
1483             $callback ||= $self->{_subscription_cb}
1484 0 0 0       or croak "Callback for $channel not specified, neither default callback defined";
1485             }
1486             else {
1487 0   0 0     $callback ||= sub { 1 };
  0            
1488             }
1489 0           $self->{_subscribed}{$channel} = $callback;
1490 0           $self->send_command( "SUBSCRIBE", $channel, \&_queue );
1491 0           return;
1492             }
1493              
1494             =head2 $self->psubscribe($pattern[, \&callback])
1495              
1496             Subscribe to channels matching I<$pattern>. If I<$callback> is not specified,
1497             default callback will be used in subscription loop, or messages will be
1498             returned by I if you are not using subscription loop.
1499              
1500             =cut
1501              
1502             sub psubscribe {
1503 0     0 1   my ( $self, $channel, $callback ) = @_;
1504 0 0         unless ( $self->{_subscription_loop} ) {
1505 0           $self->{_subscription_loop} = -1;
1506 0           $self->{_subscription_cb} = \&_queue;
1507 0           $self->{_parser}->set_default_callback( \&_queue );
1508             }
1509 0 0         croak "Subscribe to what channel?" unless length $channel;
1510 0 0         if ( $self->{_subscription_loop} > 0 ) {
1511             $callback ||= $self->{_subscription_cb}
1512 0 0 0       or croak "Callback for $channel not specified, neither default callback defined";
1513             }
1514             else {
1515 0   0 0     $callback ||= sub { 1 };
  0            
1516             }
1517 0           $self->{_psubscribed}{$channel} = $callback;
1518 0           $self->send_command( "PSUBSCRIBE", $channel, \&_queue );
1519 0           return;
1520             }
1521              
1522             =head2 $self->unsubscribe([@channels])
1523              
1524             Unsubscribe from the listed I<@channels>. If no channels was specified,
1525             unsubscribe from all the channels to which you have subscribed using
1526             I.
1527              
1528             =cut
1529              
1530             sub unsubscribe {
1531 0     0 1   my $self = shift;
1532 0 0         if (@_) {
1533 0           delete $self->{_subscribed}{$_} for @_;
1534             }
1535             else {
1536 0           $self->{_subscribed} = {};
1537             }
1538 0 0 0       if ( %{ $self->{_subscribed} }
  0            
1539 0 0         or %{ $self->{_psubscribed} || {} } )
1540             {
1541 0           return $self->send_command( "UNSUBSCRIBE", @_ );
1542             }
1543             else {
1544 0           delete $self->{_subscription_loop};
1545 0           $self->{_to_be_fetched} = 0;
1546 0           return $self->_connect;
1547             }
1548             }
1549              
1550             =head2 $self->punsubscribe([@patterns])
1551              
1552             Unsubscribe from the listed I<@patterns>. If no patterns was specified,
1553             unsubscribe from all the channels to which you have subscribed using
1554             I.
1555              
1556             =cut
1557              
1558             sub punsubscribe {
1559 0     0 1   my $self = shift;
1560 0 0         if (@_) {
1561 0           delete $self->{_psubscribed}{$_} for @_;
1562             }
1563             else {
1564 0           $self->{_psubscribed} = {};
1565             }
1566 0 0 0       if ( %{ $self->{_subscribed} || {} }
  0 0          
1567 0           or %{ $self->{_psubscribed} } )
1568             {
1569 0           return $self->send_command( "PUNSUBSCRIBE", @_ );
1570             }
1571             else {
1572 0           delete $self->{_subscription_loop};
1573 0           $self->{_to_be_fetched} = 0;
1574 0           return $self->_connect;
1575             }
1576             }
1577              
1578             =head2 $self->subscribed
1579              
1580             Return list of channels to which you have subscribed using I
1581              
1582             =cut
1583              
1584             sub subscribed {
1585 0     0 1   return keys %{ shift->{_subscribed} };
  0            
1586             }
1587              
1588             =head2 $self->psubscribed
1589              
1590             Return list of channels to which you have subscribed using I
1591              
1592             =cut
1593              
1594             sub psubscribed {
1595 0     0 1   return keys %{ shift->{_psubscribed} };
  0            
1596             }
1597              
1598             =head1 TRANSACTIONS
1599              
1600             Transactions allow you to execute a sequence of commands in a single step. In
1601             order to start a transaction you should use the I method. After you
1602             have entered a transaction all the commands you issue are queued, but not
1603             executed till you call the I method. Typically these commands return
1604             string "QUEUED" as a result, but if there is an error in e.g. number of
1605             arguments, they may return an error. When you call exec, all the queued
1606             commands will be executed and exec will return a list of results for every
1607             command in the transaction. If instead of I you call I, all
1608             scheduled commands will be canceled.
1609              
1610             You can set some keys as watched. If any watched key has been changed by
1611             another client before you called exec, the transaction will be discarded and
1612             exec will return false value.
1613              
1614             =cut
1615              
1616             =head2 $self->watch(@keys[, \&callback])
1617              
1618             mark given keys to be watched
1619              
1620             =cut
1621              
1622             sub watch {
1623 0     0 1   my $self = shift;
1624              
1625 0           $self->{_watching} = 1;
1626 0 0         if ( ref $_[-1] eq 'CODE' ) {
1627 0           return $self->send_command( 'WATCH', @_ );
1628             }
1629             else {
1630 0           return $self->execute( 'WATCH', @_ );
1631             }
1632             }
1633              
1634             =head2 $self->unwatch([\&callback])
1635              
1636             unwatch all keys
1637              
1638             =cut
1639              
1640             sub unwatch {
1641 0     0 1   my $self = shift;
1642              
1643 0           my $res;
1644 0 0         if ( ref $_[-1] eq 'CODE' ) {
1645 0           $res = $self->send_command( 'UNWATCH', @_ );
1646             }
1647             else {
1648 0           $res = $self->execute( 'UNWATCH', @_ );
1649             }
1650 0           $self->{_watching} = undef;
1651 0           return $res;
1652             }
1653              
1654             =head2 $self->multi([\&callback])
1655              
1656             Enter the transaction. After this and till I or I will be called,
1657             all the commands will be queued but not executed.
1658              
1659             =cut
1660              
1661             sub multi {
1662 0     0 1   my $self = shift;
1663              
1664 0 0         die "Multi calls can not be nested!" if $self->{_in_multi};
1665 0           $self->{_in_multi} = 1;
1666 0 0         if ( ref $_[-1] eq 'CODE' ) {
1667 0           return $self->send_command( 'MULTI', @_ );
1668             }
1669             else {
1670 0           return $self->execute('MULTI');
1671             }
1672             }
1673              
1674             =head2 $self->exec([\&callback])
1675              
1676             Execute all queued commands and finish the transaction. Returns a list of
1677             results for every command. Will croak if some command has failed. Also
1678             unwatches all the keys. If some of the watched keys has been changed by other
1679             client, the transaction will be canceled and I will return false.
1680              
1681             =cut
1682              
1683             sub exec {
1684 0     0 1   my $self = shift;
1685              
1686 0           my $res;
1687 0 0         if ( ref $_[-1] eq 'CODE' ) {
1688 0           $res = $self->send_command( 'EXEC', @_ );
1689             }
1690             else {
1691 0           $res = $self->execute('EXEC');
1692             }
1693 0           $self->{_in_multi} = undef;
1694 0           $self->{_watching} = undef;
1695 0           return $res;
1696             }
1697              
1698             =head2 $self->discard([\&callback])
1699              
1700             Discard all queued commands without executing them and unwatch all keys.
1701              
1702             =cut
1703              
1704             sub discard {
1705 0     0 1   my $self = shift;
1706              
1707 0           my $res;
1708 0 0         if ( ref $_[-1] eq 'CODE' ) {
1709 0           $res = $self->send_command( 'DISCARD', @_ );
1710             }
1711             else {
1712 0           $res = $self->execute('DISCARD');
1713             }
1714 0           $self->{_in_multi} = undef;
1715 0           $self->{_watching} = undef;
1716 0           return $res;
1717             }
1718              
1719             =head1 CLUSTER SUPPORT
1720              
1721             For accessing redis cluster use L package
1722              
1723             =head1 SENTINEL SUPPORT
1724              
1725             For accessing redis servers managed by sentinel use L package
1726              
1727             =cut
1728              
1729             1;
1730              
1731             __END__