File Coverage

blib/lib/Message/Passing/Input/ZeroMQ.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package Message::Passing::Input::ZeroMQ;
2 5     5   354471 use Moo;
  5         57858  
  5         31  
3 5     5   13080 use ZMQ::FFI::Constants qw/ :all /;
  0            
  0            
4             use AnyEvent;
5             use Scalar::Util qw/ weaken /;
6             use Try::Tiny qw/ try catch /;
7             use namespace::clean -except => 'meta';
8              
9             with qw/
10             Message::Passing::ZeroMQ::Role::HasASocket
11             Message::Passing::Role::Input
12             /;
13              
14             has '+_socket' => (
15             handles => {
16             _zmq_recv => 'recv',
17             },
18             );
19              
20             sub _socket_type { 'SUB' }
21              
22             has socket_hwm => (
23             is => 'rw',
24             default => 10000,
25             );
26              
27             has subscribe => (
28             isa => sub { ref($_[0]) eq 'ARRAY' },
29             is => 'ro',
30             lazy => 1,
31             default => sub { [ '' ] }, # Subscribe to everything!
32             );
33              
34             sub setsockopt {
35             my ($self, $socket) = @_;
36              
37             if ($self->zmq_major_version >= 3){
38             $socket->set(ZMQ_RCVHWM, 'int', $self->socket_hwm);
39             }
40             else {
41             $socket->set(ZMQ_HWM, 'uint64_t', $self->socket_hwm);
42             }
43              
44             if ($self->socket_type eq 'SUB') {
45             foreach my $sub (@{ $self->subscribe }) {
46             $socket->set(ZMQ_SUBSCRIBE, "binary", $sub);
47             }
48             }
49              
50             return;
51             }
52              
53             sub _try_rx {
54             my $self = shift();
55             my $msg;
56             try {
57             $msg = $self->_zmq_recv(ZMQ_NOBLOCK);
58             };
59             if ($msg) {
60             $self->output_to->consume($msg);
61             }
62             return $msg;
63             }
64              
65             has _io_reader => (
66             is => 'ro',
67             lazy => 1,
68             default => sub {
69             my $weak_self = shift;
70             weaken($weak_self);
71             AE::io $weak_self->_socket->get_fd, 0,
72             sub { my $more; do { $more = $weak_self->_try_rx } while ($more) };
73             },
74             );
75              
76             # Note that we need this timer as ZMQ is magic..
77             # Just checking our local FD for readability will not always
78             # be enough, as the client end of ZQM may not start pushing messages to us,
79             # ergo we call ->recv explicitly on the socket to get messages
80             # which may be pre-buffered at a client as fast as possible (i.e. before
81             # the client pushes another message).
82             has _zmq_timer => (
83             is => 'ro',
84             lazy => 1,
85             default => sub {
86             my $weak_self = shift;
87             weaken($weak_self);
88             AnyEvent->timer(after => 1, interval => 1,
89             cb => sub { my $more; do { $more = $weak_self->_try_rx } while ($more) });
90             },
91             );
92              
93             sub BUILD {
94             my $self = shift;
95             $self->_io_reader;
96             $self->_zmq_timer;
97             }
98              
99             1;
100              
101             =head1 NAME
102              
103             Message::Passing::Input::ZeroMQ - input messages from ZeroMQ.
104              
105             =head1 SYNOPSIS
106              
107             message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}'
108              
109             =head1 DESCRIPTION
110              
111             A L<Message::Passing> ZeroMQ input class.
112              
113             Can be used as part of a chain of classes with the L<message-passing> utility, or directly as
114             an input with L<Message::Passing::DSL>.
115              
116             =head1 ATTRIBUTES
117              
118             See L<Message::Passing::ZeroMQ/CONNECTION ATTRIBUTES>
119              
120             =head2 subscribe
121              
122             If the input socket is a C<SUB> socket, then the C<ZMQ_SUBSCRIBE>
123             socket option will be set once for each value in the subscribe attribute.
124              
125             Defaults to '', which means all messages are subscribed to.
126              
127             =head1 SEE ALSO
128              
129             =over
130              
131             =item L<Message::Passing::ZeroMQ>
132              
133             =item L<Message::Passing::Output::ZeroMQ>
134              
135             =item L<Message::Passing>
136              
137             =item L<ZeroMQ>
138              
139             =item L<http://www.zeromq.org/>
140              
141             =back
142              
143             =head1 SPONSORSHIP
144              
145             This module exists due to the wonderful people at Suretec Systems Ltd.
146             <http://www.suretecsystems.com/> who sponsored its development for its
147             VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
148             the SureVoIP API -
149             <http://www.surevoip.co.uk/support/wiki/api_documentation>
150              
151             =head1 AUTHOR, COPYRIGHT AND LICENSE
152              
153             See L<Message::Passing::ZeroMQ>.
154              
155             =cut
156