File Coverage

blib/lib/Mojo/Redis/PubSub.pm
Criterion Covered Total %
statement 42 80 52.5
branch 14 46 30.4
condition 16 23 69.5
subroutine 9 21 42.8
pod 10 10 100.0
total 91 180 50.5


line stmt bran cond sub pod time code
1             package Mojo::Redis::PubSub;
2 19     19   534 use Mojo::Base 'Mojo::EventEmitter';
  19         38  
  19         267  
3              
4 19     19   3540 use Mojo::JSON qw(from_json to_json);
  19         18997  
  19         1370  
5              
6 19     19   100 use constant DEBUG => $ENV{MOJO_REDIS_DEBUG};
  19         44  
  19         34385  
7              
8             has connection => sub {
9             my $self = shift;
10             my $conn = $self->redis->_connection;
11              
12             Scalar::Util::weaken($self);
13             for my $name (qw(close error response)) {
14             my $handler = "_on_$name";
15             $conn->on($name => sub { $self and $self->$handler(@_) });
16             }
17              
18             return $conn;
19             };
20              
21             has db => sub {
22             my $self = shift;
23             my $db = $self->redis->db;
24             Scalar::Util::weaken($db->{redis});
25             return $db;
26             };
27              
28             has reconnect_interval => 1;
29             has redis => sub { Carp::confess('redis is requried in constructor') };
30              
31 0     0 1 0 sub channels_p { shift->db->call_p(qw(PUBSUB CHANNELS), @_) }
32 0 0   0 1 0 sub json { ++$_[0]{json}{$_[1]} and return $_[0] }
33              
34             sub keyspace_listen {
35 1     1 1 4 my ($self, $cb) = (shift, pop);
36 1         3 my $key = $self->_keyspace_key(@_);
37 1         3 $self->{keyspace_listen}{$key} = 1;
38 1         4 return $self->listen($key, $cb);
39             }
40              
41             sub keyspace_unlisten {
42 2 100   2 1 1038 my ($self, $cb) = (shift, ref $_[-1] eq 'CODE' ? pop : undef);
43 2         5 return $self->unlisten($self->_keyspace_key(@_), $cb);
44             }
45              
46             sub listen {
47 1     1 1 4 my ($self, $name, $cb) = @_;
48              
49 1 50 50     2 unless (@{$self->{chans}{$name} ||= []}) {
  1         8  
50 1 50       5 Mojo::IOLoop->remove(delete $self->{reconnect_tid}) if $self->{reconnect_tid};
51 1 50       6 $self->_write([($name =~ /\*/ ? 'PSUBSCRIBE' : 'SUBSCRIBE') => $name]);
52             }
53              
54 1         2 push @{$self->{chans}{$name}}, $cb;
  1         3  
55 1         4 return $cb;
56             }
57              
58             sub notify_p {
59 0     0 1 0 my ($self, $name, $payload) = @_;
60 0 0       0 $payload = to_json $payload if $self->{json}{$name};
61 0         0 return $self->db->call_p(PUBLISH => $name, $payload);
62             }
63              
64 0     0 1 0 sub notify { shift->notify_p(@_)->wait }
65 0     0 1 0 sub numpat_p { shift->db->call_p(qw(PUBSUB NUMPAT)) }
66 0     0 1 0 sub numsub_p { shift->db->call_p(qw(PUBSUB NUMSUB), @_)->then(\&_flatten) }
67              
68             sub unlisten {
69 2     2 1 5 my ($self, $name, $cb) = @_;
70 2         6 my $chans = $self->{chans}{$name};
71              
72 2 100       6 @$chans = $cb ? grep { $cb ne $_ } @$chans : ();
  1         5  
73 2 50       4 unless (@$chans) {
74 2         6 my $conn = $self->connection;
75 2 0       13 $conn->write(($name =~ /\*/ ? 'PUNSUBSCRIBE' : 'UNSUBSCRIBE'), $name) if $conn->is_connected;
    50          
76 2         5 delete $self->{chans}{$name};
77             }
78              
79 2         9 return $self;
80             }
81              
82 0     0   0 sub _flatten { +{@{$_[0]}} }
  0         0  
83              
84             sub _keyspace_key {
85 10 100   10   80 my $args = ref $_[-1] eq 'HASH' ? pop : {};
86 10         15 my $self = shift;
87              
88 10   100     50 local $args->{key} = $_[0] // $args->{key} // '*';
      100        
89 10   66     36 local $args->{op} = $_[1] // $args->{op} // '*';
      50        
90 10   66     35 local $args->{type} = $args->{type} || ($args->{key} eq '*' ? 'keyevent' : 'keyspace');
91              
92             return sprintf '__%s@%s__:%s', $args->{type}, $args->{db} // $self->redis->url->path->[0] // '*',
93 10 100 100     54 $args->{type} eq 'keyevent' ? $args->{op} : $args->{key};
      100        
94             }
95              
96             sub _on_close {
97 0     0   0 my $self = shift;
98 0         0 $self->emit(disconnect => $self->connection);
99              
100 0         0 my $delay = $self->reconnect_interval;
101 0 0 0     0 return $self if $delay < 0 or $self->{reconnect_tid};
102              
103 0         0 warn qq([Mojo::Redis::PubSub] Reconnecting in ${delay}s...\n) if DEBUG;
104 0         0 Scalar::Util::weaken($self);
105 0         0 $self->{reconnect} = 1;
106 0 0   0   0 $self->{reconnect_tid} = Mojo::IOLoop->timer($delay => sub { $self and $self->_reconnect });
  0         0  
107 0         0 return $self;
108             }
109              
110 0     0   0 sub _on_error { $_[0]->emit(error => $_[2]) }
111              
112             sub _on_response {
113 0     0   0 my ($self, $conn, $res) = @_;
114 0 0       0 $self->emit(reconnect => $conn) if delete $self->{reconnect};
115              
116             # $res = [pmessage => $name, $channel, $data]
117             # $res = [message => $channel, $data]
118              
119 0 0       0 return unless ref $res eq 'ARRAY';
120 0 0       0 return $self->emit(@$res) unless $res->[0] =~ m!^p?message$!i;
121              
122 0 0       0 my ($name) = $res->[0] eq 'pmessage' ? splice @$res, 1, 1 : ($res->[1]);
123 0         0 my $keyspace_listen = $self->{keyspace_listen}{$name};
124              
125 0         0 local $@;
126 0 0       0 $res->[2] = eval { from_json $res->[2] } if $self->{json}{$name};
  0         0  
127 0 0       0 for my $cb (@{$self->{chans}{$name} || []}) {
  0         0  
128 0 0       0 $self->$cb($keyspace_listen ? [@$res[1, 2]] : $res->[2], $res->[1]);
129             }
130             }
131              
132             sub _reconnect {
133 0     0   0 my $self = shift;
134 0         0 delete $self->{$_} for qw(before_connect connection reconnect_tid);
135 0 0       0 $self->_write(map { [(/\*/ ? 'PSUBSCRIBE' : 'SUBSCRIBE') => $_] } keys %{$self->{chans}});
  0         0  
  0         0  
136             }
137              
138             sub _write {
139 1     1   3 my ($self, @commands) = @_;
140 1         5 my $conn = $self->connection;
141 1 50       13 $self->emit(before_connect => $conn) unless $self->{before_connect}++;
142 1         16 $conn->write(@$_) for @commands;
143             }
144              
145             1;
146              
147             =encoding utf8
148              
149             =head1 NAME
150              
151             Mojo::Redis::PubSub - Publish and subscribe to Redis messages
152              
153             =head1 SYNOPSIS
154              
155             use Mojo::Redis;
156              
157             my $redis = Mojo::Redis->new;
158             my $pubsub = $redis->pubsub;
159              
160             $pubsub->listen("user:superwoman:messages" => sub {
161             my ($pubsub, $message, $channel) = @_;
162             say "superwoman got a message '$message' from channel '$channel'";
163             });
164              
165             $pubsub->notify("user:batboy:messages", "How are you doing?");
166              
167             See L
168             for example L application.
169              
170             =head1 DESCRIPTION
171              
172             L is an implementation of the Redis Publish/Subscribe
173             messaging paradigm. This class has the same API as L, so
174             you can easily switch between the backends.
175              
176             This object holds one connection for receiving messages, and one connection
177             for sending messages. They are created lazily the first time L or
178             L is called. These connections does not affect the connection pool
179             for L.
180              
181             See L for more details.
182              
183             =head1 EVENTS
184              
185             =head2 before_connect
186              
187             $pubsub->on(before_connect => sub { my ($pubsub, $conn) = @_; ... });
188              
189             Emitted before L is connected to the redis server. This can be
190             useful if you want to gather the L
191             or run other commands before it goes into subscribe mode.
192              
193             =head2 disconnect
194              
195             $pubsub->on(disconnect => sub { my ($pubsub, $conn) = @_; ... });
196              
197             Emitted after L is disconnected from the redis server.
198              
199             =head2 psubscribe
200              
201             $pubsub->on(psubscribe => sub { my ($pubsub, $channel, $success) = @_; ... });
202              
203             Emitted when the server responds to the L request and/or when
204             L resends psubscribe messages.
205              
206             This event is EXPERIMENTAL.
207              
208             =head2 reconnect
209              
210             $pubsub->on(reconnect => sub { my ($pubsub, $conn) = @_; ... });
211              
212             Emitted after switching the L with a new connection. This event
213             will only happen if L is 0 or more.
214              
215             =head2 subscribe
216              
217             $pubsub->on(subscribe => sub { my ($pubsub, $channel, $success) = @_; ... });
218              
219             Emitted when the server responds to the L request and/or when
220             L resends subscribe messages.
221              
222             This event is EXPERIMENTAL.
223              
224             =head1 ATTRIBUTES
225              
226             =head2 db
227              
228             $db = $pubsub->db;
229              
230             Holds a L object that will be used to publish messages
231             or run other commands that cannot be run by the L.
232              
233             =head2 connection
234              
235             $conn = $pubsub->connection;
236              
237             Holds a L object that will be used to subscribe to
238             channels.
239              
240             =head2 reconnect_interval
241              
242             $interval = $pubsub->reconnect_interval;
243             $pubsub = $pubsub->reconnect_interval(1);
244             $pubsub = $pubsub->reconnect_interval(0.1);
245             $pubsub = $pubsub->reconnect_interval(-1);
246              
247             The amount of time in seconds to wait to L after disconnecting.
248             Default is 1 (second). L can be disabled by setting this to a
249             negative value.
250              
251             =head2 redis
252              
253             $conn = $pubsub->connection;
254             $pubsub = $pubsub->connection(Mojo::Redis->new);
255              
256             Holds a L object used to create the connections to talk with Redis.
257              
258             =head1 METHODS
259              
260             =head2 channels_p
261              
262             $promise = $pubsub->channels_p->then(sub { my $channels = shift });
263             $promise = $pubsub->channels_p("pat*")->then(sub { my $channels = shift });
264              
265             Lists the currently active channels. An active channel is a Pub/Sub channel
266             with one or more subscribers (not including clients subscribed to patterns).
267              
268             =head2 json
269              
270             $pubsub = $pubsub->json("foo");
271              
272             Activate automatic JSON encoding and decoding with L and
273             L for a channel.
274              
275             # Send and receive data structures
276             $pubsub->json("foo")->listen(foo => sub {
277             my ($pubsub, $payload, $channel) = @_;
278             say $payload->{bar};
279             });
280             $pubsub->notify(foo => {bar => 'I ♥ Mojolicious!'});
281              
282             =head2 keyspace_listen
283              
284             $cb = $pubsub->keyspace_listen(\%args, sub { my ($pubsub, $message) = @_ }) });
285             $cb = $pubsub->keyspace_listen({key => "cool:key"}, sub { my ($pubsub, $message) = @_ }) });
286             $cb = $pubsub->keyspace_listen({op => "del"}, sub { my ($pubsub, $message) = @_ }) });
287              
288             Used to listen for keyspace notifications. See L
289             for more details. The channel that will be subscribed to will look like one of
290             these:
291              
292             __keyspace@${db}__:$key $op
293             __keyevent@${db}__:$op $key
294              
295             This means that "key" and "op" is mutually exclusive from the list of
296             parameters below:
297              
298             =over 2
299              
300             =item * db
301              
302             Default database to listen for events is the database set in
303             L. "*" is also a valid value, meaning listen for events
304             happening in all databases.
305              
306             =item * key
307              
308             Alternative to passing in C<$key>. Default value is "*".
309              
310             =item * op
311              
312             Alternative to passing in C<$op>. Default value is "*".
313              
314             =back
315              
316             =head2 keyspace_unlisten
317              
318             $pubsub = $pubsub->keyspace_unlisten(@args);
319             $pubsub = $pubsub->keyspace_unlisten(@args, $cb);
320              
321             Stop listening for keyspace events. See L for details about
322             keyspace events and what C<@args> can be.
323              
324             =head2 listen
325              
326             $cb = $pubsub->listen($channel => sub { my ($pubsub, $message, $channel) = @_ });
327              
328             Subscribe to an exact channel name
329             (L) or a channel name with a
330             pattern (L). C<$channel> in
331             the callback will be the exact channel name, without any pattern. C<$message>
332             will be the data published to that the channel.
333              
334             The returning code ref can be passed on to L.
335              
336             =head2 notify
337              
338             $pubsub->notify($channel => $message);
339              
340             Send a plain string message to a channel. This method is the same as:
341              
342             $pubsub->notify_p($channel => $message)->wait;
343              
344             =head2 notify_p
345              
346             $p = $pubsub->notify_p($channel => $message);
347              
348             Send a plain string message to a channel and returns a L object.
349              
350             =head2 numpat_p
351              
352             $promise = $pubsub->channels_p->then(sub { my $int = shift });
353              
354             Returns the number of subscriptions to patterns (that are performed using the
355             PSUBSCRIBE command). Note that this is not just the count of clients
356             subscribed to patterns but the total number of patterns all the clients are
357             subscribed to.
358              
359             =head2 numsub_p
360              
361             $promise = $pubsub->numsub_p(@channels)->then(sub { my $channels = shift });
362              
363             Returns the number of subscribers (not counting clients subscribed to
364             patterns) for the specified channels as a hash-ref, where the keys are
365             channel names.
366              
367             =head2 unlisten
368              
369             $pubsub = $pubsub->unlisten($channel);
370             $pubsub = $pubsub->unlisten($channel, $cb);
371              
372             Unsubscribe from a channel.
373              
374             =head1 SEE ALSO
375              
376             L.
377              
378             =cut