File Coverage

blib/lib/Mojolicious/Plugin/RedisSubscriber.pm
Criterion Covered Total %
statement 21 35 60.0
branch 0 6 0.0
condition n/a
subroutine 7 10 70.0
pod 3 3 100.0
total 31 54 57.4


line stmt bran cond sub pod time code
1             package Mojolicious::Plugin::RedisSubscriber;
2              
3 2     2   148833 use warnings;
  2         4  
  2         60  
4 2     2   8 use strict;
  2         2  
  2         63  
5 2     2   857 use Mojo::Base 'Mojo::EventEmitter';
  2         9415  
  2         11  
6              
7 2     2   3163 use Cache::RedisDB;
  2         54686  
  2         49  
8 2     2   939 use Mojo::Redis;
  2         245369  
  2         25  
9 2     2   1116 use YAML::XS;
  2         4565  
  2         100  
10 2     2   12 use Try::Tiny;
  2         2  
  2         1105  
11              
12             our $VERSION = "0.0.1";
13              
14             =head1 NAME
15              
16             Mojolicious::Plugin::RedisSubscriber
17              
18             =head1 SYNOPSIS
19              
20             use Mojolicious::Plugin::RedisSubscriber;
21             my $redis = Mojolicious::Plugin::RedisSubscriber->new;
22             my $cb = $redis->subscribe("feed::frxEURUSD", sub { ... });
23             ...;
24             $redis->unsubscribe("feed::frxEURUSD", $cb);
25              
26             =head1 DESCRIPTION
27              
28             Module subscribes to specified channels and emits events when there are messages.
29              
30             =cut
31              
32             has channel_hash => sub { return {'Mojo::RedisSubscriber' => 1} };
33              
34             has redis => sub {
35             my $self = shift;
36             my $_subscribe;
37             $_subscribe = sub {
38             my $channels = shift;
39             my $subs = Mojo::Redis::Subscription->new(
40             server => Cache::RedisDB->redis_server_info,
41             channels => $self->channels,
42             encoding => '',
43             timeout => 300,
44             );
45             $subs->on(close => sub { $_subscribe->($self->channels) });
46             $subs->on(error => sub { $_subscribe->($self->channels) });
47             $subs->on(timeout => sub { $_subscribe->($self->channels) });
48             $subs->on(
49             message => sub {
50             my (undef, $message, $channel) = @_;
51             if ($message && $message =~ /^--/) {
52             $message = try { Load($message) };
53             }
54             $self->emit($channel => $message);
55             },
56             );
57             $subs->connect;
58             $self->redis($subs);
59             return $subs;
60             };
61             return $_subscribe->($self->channels);
62             };
63              
64             =head1 METHODS
65              
66             Module provides the following methods:
67              
68             =cut
69              
70             =head2 $self->channel_hash
71              
72             Returns the underlying subscriber object.
73              
74             =head2 $self->channels
75              
76             Returns reference to list of channel names to which redis is subscribed.
77              
78             =cut
79              
80             sub channels {
81 0     0 1   my $self = shift;
82 0           return [keys %{$self->channel_hash}];
  0            
83             }
84              
85             =head2 $self->redis
86              
87             returns the underlying redis server object.
88              
89             =head2 $self->subscribe($channel => $callback)
90              
91             Ensures that module subscribed to specified redis channels and adds
92             I<$callback> to the list invoked when there's a message in channel
93              
94             =cut
95              
96             sub subscribe {
97 0     0 1   my ($self, $channel, $callback) = @_;
98 0           my $redis = $self->redis;
99 0 0         unless ($self->channel_hash->{$channel}++) {
100 0           $redis->execute([subscribe => "$channel"]);
101             }
102 0           return $self->on($channel => $callback);
103             }
104              
105             =head2 $self->unsubscribe($channel => $callback)
106              
107             Unsubscribes I<$callback> from the I<$channel>
108              
109             =cut
110              
111             sub unsubscribe {
112 0     0 1   my ($self, $channel, $callback) = @_;
113 0           my $redis = $self->redis;
114 0 0         return unless $self->channel_hash->{$channel};
115 0 0         unless (--$self->channel_hash->{$channel}) {
116 0           $redis->execute([unsubscribe => "$channel"]);
117             }
118 0           return $self->SUPER::unsubscribe($channel => $callback);
119             }
120              
121             =head1 FEATURES NOT YET SUPPORTED
122              
123             Configuration vs config yml files, particularly regarding default channels
124              
125             =head1 LICENSE AND COPYRIGHT
126              
127             Copyright (C) 2015 Regent Markets. This may be distributed under the same terms as Perl itself.
128              
129             =cut
130              
131             1;