File Coverage

blib/lib/Mojo/Redis/PubSub.pm
Criterion Covered Total %
statement 42 79 53.1
branch 14 44 31.8
condition 16 29 55.1
subroutine 9 21 42.8
pod 10 10 100.0
total 91 183 49.7


line stmt bran cond sub pod time code
1             package Mojo::Redis::PubSub;
2 19     19   730 use Mojo::Base 'Mojo::EventEmitter';
  19         37  
  19         270  
3              
4 19     19   4143 use Mojo::JSON qw(from_json to_json);
  19         24302  
  19         1967  
5              
6 19     19   127 use constant DEBUG => $ENV{MOJO_REDIS_DEBUG};
  19         38  
  19         41920  
7              
8             has connection => sub {
9             my $self = shift;
10             my $conn = $self->redis->_connection;
11              
12             Scalar::Util::weaken($self);
13             for my $name (qw(close error response)) {
14             my $handler = "_on_$name";
15             $conn->on($name => sub { $self and $self->$handler(@_) });
16             }
17              
18             return $conn;
19             };
20              
21             has db => sub {
22             my $self = shift;
23             my $db = $self->redis->db;
24             Scalar::Util::weaken($db->{redis});
25             return $db;
26             };
27              
28             has reconnect_interval => 1;
29             has redis => sub { Carp::confess('redis is requried in constructor') };
30              
31 0     0 1 0 sub channels_p { shift->db->call_p(qw(PUBSUB CHANNELS), @_) }
32 0 0   0 1 0 sub json { ++$_[0]{json}{$_[1]} and return $_[0] }
33              
34             sub keyspace_listen {
35 1     1 1 4 my ($self, $cb) = (shift, pop);
36 1         5 my $key = $self->_keyspace_key(@_);
37 1         5 $self->{keyspace_listen}{$key} = 1;
38 1         7 return $self->listen($key, $cb);
39             }
40              
41             sub keyspace_unlisten {
42 2 100   2 1 1514 my ($self, $cb) = (shift, ref $_[-1] eq 'CODE' ? pop : undef);
43 2         8 return $self->unlisten($self->_keyspace_key(@_), $cb);
44             }
45              
46             sub listen {
47 1     1 1 3 my ($self, $name, $cb) = @_;
48              
49 1 50 50     3 unless (@{$self->{chans}{$name} ||= []}) {
  1         12  
50 1 50       5 Mojo::IOLoop->remove(delete $self->{reconnect_tid}) if $self->{reconnect_tid};
51 1 50       11 $self->_write([($name =~ /\*/ ? 'PSUBSCRIBE' : 'SUBSCRIBE') => $name]);
52             }
53              
54 1         23 push @{$self->{chans}{$name}}, $cb;
  1         6  
55 1         5 return $cb;
56             }
57              
58             sub notify_p {
59 0     0 1 0 my ($self, $name, $payload) = @_;
60 0 0       0 $payload = to_json $payload if $self->{json}{$name};
61 0         0 return $self->db->call_p(PUBLISH => $name, $payload);
62             }
63              
64 0     0 1 0 sub notify { shift->notify_p(@_)->wait }
65 0     0 1 0 sub numpat_p { shift->db->call_p(qw(PUBSUB NUMPAT)) }
66 0     0 1 0 sub numsub_p { shift->db->call_p(qw(PUBSUB NUMSUB), @_)->then(\&_flatten) }
67              
68             sub unlisten {
69 2     2 1 7 my ($self, $name, $cb) = @_;
70 2         5 my $chans = $self->{chans}{$name};
71              
72 2 100       7 @$chans = $cb ? grep { $cb ne $_ } @$chans : ();
  1         6  
73 2 50       7 unless (@$chans) {
74 2         9 my $conn = $self->connection;
75 2 0       17 $conn->write(($name =~ /\*/ ? 'PUNSUBSCRIBE' : 'UNSUBSCRIBE'), $name) if $conn->is_connected;
    50          
76 2         5 delete $self->{chans}{$name};
77             }
78              
79 2         23 return $self;
80             }
81              
82 0     0   0 sub _flatten { +{@{$_[0]}} }
  0         0  
83              
84             sub _keyspace_key {
85 10 100   10   99 my $args = ref $_[-1] eq 'HASH' ? pop : {};
86 10         19 my $self = shift;
87              
88 10   100     84 local $args->{key} = $_[0] // $args->{key} // '*';
      100        
89 10   66     118 local $args->{op} = $_[1] // $args->{op} // '*';
      50        
90 10   66     61 local $args->{type} = $args->{type} || ($args->{key} eq '*' ? 'keyevent' : 'keyspace');
91              
92             return sprintf '__%s@%s__:%s', $args->{type}, $args->{db} // $self->redis->url->path->[0] // '*',
93 10 100 100     105 $args->{type} eq 'keyevent' ? $args->{op} : $args->{key};
      100        
94             }
95              
96             sub _on_close {
97 0     0   0 my $self = shift;
98 0         0 $self->emit(disconnect => $self->connection);
99              
100 0         0 my $delay = $self->reconnect_interval;
101 0 0 0     0 return $self if $delay < 0 or $self->{reconnect_tid};
102              
103 0         0 warn qq([Mojo::Redis::PubSub] Reconnecting in ${delay}s...\n) if DEBUG;
104 0         0 Scalar::Util::weaken($self);
105 0         0 $self->{reconnect} = 1;
106 0 0   0   0 $self->{reconnect_tid} = Mojo::IOLoop->timer($delay => sub { $self and $self->_reconnect });
  0         0  
107 0         0 return $self;
108             }
109              
110 0     0   0 sub _on_error { $_[0]->emit(error => $_[2]) }
111              
112             sub _on_response {
113 0     0   0 my ($self, $conn, $res) = @_;
114 0 0       0 $self->emit(reconnect => $conn) if delete $self->{reconnect};
115              
116 0 0       0 return unless ref $res eq 'ARRAY';
117 0 0       0 return $self->emit(@$res) unless $res->[0] =~ m!^p?message$!i;
118 0 0       0 my ($psub) = $res->[0] eq 'pmessage' ? splice @$res, 1, 1 : ();
119 0 0       0 $res->[2] = eval { from_json $res->[2] } if $self->{json}{$res->[1]};
  0         0  
120 0   0     0 my $keyspace_listen = $self->{keyspace_listen}{$psub || $res->[1]};
121 0 0 0     0 for my $cb (@{$self->{chans}{$psub || $res->[1]}}) { $self->$cb($keyspace_listen ? [@$res[1, 2]] : $res->[2]) }
  0         0  
  0         0  
122             }
123              
124             sub _reconnect {
125 0     0   0 my $self = shift;
126 0         0 delete $self->{$_} for qw(before_connect connection reconnect_tid);
127 0 0       0 $self->_write(map { [(/\*/ ? 'PSUBSCRIBE' : 'SUBSCRIBE') => $_] } keys %{$self->{chans}});
  0         0  
  0         0  
128             }
129              
130             sub _write {
131 1     1   3 my ($self, @commands) = @_;
132 1         7 my $conn = $self->connection;
133 1 50       27 $self->emit(before_connect => $conn) unless $self->{before_connect}++;
134 1         18 $conn->write(@$_) for @commands;
135             }
136              
137             1;
138              
139             =encoding utf8
140              
141             =head1 NAME
142              
143             Mojo::Redis::PubSub - Publish and subscribe to Redis messages
144              
145             =head1 SYNOPSIS
146              
147             use Mojo::Redis;
148              
149             my $redis = Mojo::Redis->new;
150             my $pubsub = $redis->pubsub;
151              
152             $pubsub->listen("user:superwoman:messages" => sub {
153             my ($pubsub, $message) = @_;
154             say "superwoman got a message: $message";
155             });
156              
157             $pubsub->notify("user:batboy:messages", "How are you doing?");
158              
159             See L
160             for example L application.
161              
162             =head1 DESCRIPTION
163              
164             L is an implementation of the Redis Publish/Subscribe
165             messaging paradigm. This class has the same API as L, so
166             you can easily switch between the backends.
167              
168             This object holds one connection for receiving messages, and one connection
169             for sending messages. They are created lazily the first time L or
170             L is called. These connections does not affect the connection pool
171             for L.
172              
173             See L for more details.
174              
175             =head1 EVENTS
176              
177             =head2 before_connect
178              
179             $pubsub->on(before_connect => sub { my ($pubsub, $conn) = @_; ... });
180              
181             Emitted before L is connected to the redis server. This can be
182             useful if you want to gather the L
183             or run other commands before it goes into subscribe mode.
184              
185             =head2 disconnect
186              
187             $pubsub->on(disconnect => sub { my ($pubsub, $conn) = @_; ... });
188              
189             Emitted after L is disconnected from the redis server.
190              
191             =head2 psubscribe
192              
193             $pubsub->on(psubscribe => sub { my ($pubsub, $channel, $success) = @_; ... });
194              
195             Emitted when the server responds to the L request and/or when
196             L resends psubscribe messages.
197              
198             This event is EXPERIMENTAL.
199              
200             =head2 reconnect
201              
202             $pubsub->on(reconnect => sub { my ($pubsub, $conn) = @_; ... });
203              
204             Emitted after switching the L with a new connection. This event
205             will only happen if L is 0 or more.
206              
207             =head2 subscribe
208              
209             $pubsub->on(subscribe => sub { my ($pubsub, $channel, $success) = @_; ... });
210              
211             Emitted when the server responds to the L request and/or when
212             L resends subscribe messages.
213              
214             This event is EXPERIMENTAL.
215              
216             =head1 ATTRIBUTES
217              
218             =head2 db
219              
220             $db = $pubsub->db;
221              
222             Holds a L object that will be used to publish messages
223             or run other commands that cannot be run by the L.
224              
225             =head2 connection
226              
227             $conn = $pubsub->connection;
228              
229             Holds a L object that will be used to subscribe to
230             channels.
231              
232             =head2 reconnect_interval
233              
234             $interval = $pubsub->reconnect_interval;
235             $pubsub = $pubsub->reconnect_interval(1);
236             $pubsub = $pubsub->reconnect_interval(0.1);
237             $pubsub = $pubsub->reconnect_interval(-1);
238              
239             The amount of time in seconds to wait to L after disconnecting.
240             Default is 1 (second). L can be disabled by setting this to a
241             negative value.
242              
243             =head2 redis
244              
245             $conn = $pubsub->connection;
246             $pubsub = $pubsub->connection(Mojo::Redis->new);
247              
248             Holds a L object used to create the connections to talk with Redis.
249              
250             =head1 METHODS
251              
252             =head2 channels_p
253              
254             $promise = $pubsub->channels_p->then(sub { my $channels = shift });
255             $promise = $pubsub->channels_p("pat*")->then(sub { my $channels = shift });
256              
257             Lists the currently active channels. An active channel is a Pub/Sub channel
258             with one or more subscribers (not including clients subscribed to patterns).
259              
260             =head2 json
261              
262             $pubsub = $pubsub->json("foo");
263              
264             Activate automatic JSON encoding and decoding with L and
265             L for a channel.
266              
267             # Send and receive data structures
268             $pubsub->json("foo")->listen(foo => sub {
269             my ($pubsub, $payload) = @_;
270             say $payload->{bar};
271             });
272             $pubsub->notify(foo => {bar => 'I ♥ Mojolicious!'});
273              
274             =head2 keyspace_listen
275              
276             $cb = $pubsub->keyspace_listen(\%args, sub { my ($pubsub, $message) = @_ }) });
277             $cb = $pubsub->keyspace_listen({key => "cool:key"}, sub { my ($pubsub, $message) = @_ }) });
278             $cb = $pubsub->keyspace_listen({op => "del"}, sub { my ($pubsub, $message) = @_ }) });
279              
280             Used to listen for keyspace notifications. See L
281             for more details. The channel that will be subscribed to will look like one of
282             these:
283              
284             __keyspace@${db}__:$key $op
285             __keyevent@${db}__:$op $key
286              
287             This means that "key" and "op" is mutually exclusive from the list of
288             parameters below:
289              
290             =over 2
291              
292             =item * db
293              
294             Default database to listen for events is the database set in
295             L. "*" is also a valid value, meaning listen for events
296             happening in all databases.
297              
298             =item * key
299              
300             Alternative to passing in C<$key>. Default value is "*".
301              
302             =item * op
303              
304             Alternative to passing in C<$op>. Default value is "*".
305              
306             =back
307              
308             =head2 keyspace_unlisten
309              
310             $pubsub = $pubsub->keyspace_unlisten(@args);
311             $pubsub = $pubsub->keyspace_unlisten(@args, $cb);
312              
313             Stop listening for keyspace events. See L for details about
314             keyspace events and what C<@args> can be.
315              
316             =head2 listen
317              
318             $cb = $pubsub->listen($channel => sub { my ($pubsub, $message) = @_ });
319              
320             Subscribe to a channel, there is no limit on how many subscribers a channel
321             can have. The returning code ref can be passed on to L.
322              
323             =head2 notify
324              
325             $pubsub->notify($channel => $message);
326              
327             Send a plain string message to a channel. This method is the same as:
328              
329             $pubsub->notify_p($channel => $message)->wait;
330              
331             =head2 notify_p
332              
333             $p = $pubsub->notify_p($channel => $message);
334              
335             Send a plain string message to a channel and returns a L object.
336              
337             =head2 numpat_p
338              
339             $promise = $pubsub->channels_p->then(sub { my $int = shift });
340              
341             Returns the number of subscriptions to patterns (that are performed using the
342             PSUBSCRIBE command). Note that this is not just the count of clients
343             subscribed to patterns but the total number of patterns all the clients are
344             subscribed to.
345              
346             =head2 numsub_p
347              
348             $promise = $pubsub->numsub_p(@channels)->then(sub { my $channels = shift });
349              
350             Returns the number of subscribers (not counting clients subscribed to
351             patterns) for the specified channels as a hash-ref, where the keys are
352             channel names.
353              
354             =head2 unlisten
355              
356             $pubsub = $pubsub->unlisten($channel);
357             $pubsub = $pubsub->unlisten($channel, $cb);
358              
359             Unsubscribe from a channel.
360              
361             =head1 SEE ALSO
362              
363             L.
364              
365             =cut