File Coverage

blib/lib/Message/Passing/Input/Redis.pm
Criterion Covered Total %
statement 15 32 46.8
branch 0 4 0.0
condition n/a
subroutine 5 10 50.0
pod n/a
total 20 46 43.4


line stmt bran cond sub pod time code
1             package Message::Passing::Input::Redis;
2 1     1   5548 use Moo;
  1         23728  
  1         9  
3 1     1   15703 use MooX::Types::MooseLike::Base qw/ ArrayRef Str /;
  1         12929  
  1         130  
4 1     1   13 use Scalar::Util qw/ weaken /;
  1         2  
  1         47  
5 1     1   2312 use AnyEvent;
  1         48261  
  1         51  
6 1     1   12 use namespace::clean -except => 'meta';
  1         2  
  1         12  
7              
8             with qw/
9             Message::Passing::Redis::Role::HasAConnection
10             Message::Passing::Role::Input
11             /;
12              
13             has topics => (
14             isa => ArrayRef[Str],
15             coerce => sub {
16             my $str = shift;
17             return $str if ref $str eq 'ARRAY';
18             return [ $str ] unless ref $str;
19             return [];
20             },
21             is => 'ro',
22             default => sub { [] },
23             );
24              
25             has ptopics => (
26             isa => ArrayRef[Str],
27             coerce => sub {
28             my $str = shift;
29             return $str if ref $str eq 'ARRAY';
30             return [ $str ] unless ref $str;
31             return [];
32             },
33             is => 'ro',
34             default => sub { [] },
35             );
36              
37             has _handle => (
38             is => 'rw',
39             clearer => '_clear_handle',
40             );
41              
42             sub connected {
43 0     0     my ($self, $client) = @_;
44 0           weaken($self);
45 0           weaken($client);
46 0           $client->subscribe(
47             @{ $self->topics },
48             sub {
49 0     0     my ($message, $topic, $subscribed_topic) = @_;
50 0           $self->output_to->consume($message);
51             },
52 0 0         ) if @{ $self->topics };
  0            
53 0           $client->psubscribe(
54             @{ $self->ptopics },
55             sub {
56 0     0     my ($message, $topic, $subscribed_topic) = @_;
57 0           $self->output_to->consume($message);
58             },
59 0 0         ) if @{ $self->ptopics };
  0            
60             $self->_handle(AnyEvent->io(
61             fh => $client->{sock},
62             poll => "r",
63             cb => sub {
64 0     0     $client->wait_for_messages(0);
65             },
66 0           ));
67             }
68              
69             sub disconnect {
70 0     0     my ($self) = @_;
71 0           $self->_clear_handle;
72             }
73              
74             1;
75              
76             =head1 NAME
77              
78             Message::Passing::Input::Redis - A Redis consumer for Message::Passing
79              
80             =head1 SYNOPSIS
81              
82             $ message-pass --output STDOUT --input Redis --input_options '{"topics":["foo"],"hostname":"127.0.0.1","port":"6379"}'
83              
84             =head1 DESCRIPTION
85              
86             A simple subscriber a Redis PubSub topic
87              
88             =head1 ATTRIBUTES
89              
90             =head2 hostname
91              
92             The hostname of the Redis server. Required.
93              
94             =head2 port
95              
96             The port number of the Redis server. Defaults to 6379.
97              
98             =head2 topics
99              
100             A list of topics to consume messages from.
101              
102             These topic names are matched exactly.
103              
104             =head2 ptopics
105              
106             A list of pattern topics to consume messages from.
107              
108             These topic names can wildcard match, so for example C<< prefix1.* >>
109             will match topics C<< prefix1.foo >> and C<< prefix1.bar >>.
110              
111             =head1 METHODS
112              
113             =head2 connected
114              
115             Called by L to indicate a
116             connection to the Redis server has been made.
117              
118             Causes the subscription to the topic(s) to be started
119              
120             =head2 disconnect
121              
122             Called by L to indicate a
123             connection to the Redis server has failed.
124              
125             =head1 SEE ALSO
126              
127             =over
128              
129             =item L
130              
131             =item L
132              
133             =back
134              
135             =head1 AUTHOR, COPYRIGHT AND LICENSE
136              
137             See L.
138              
139             =cut
140