File Coverage

blib/lib/Message/Passing/Input/ZeroMQ.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package Message::Passing::Input::ZeroMQ;
2 5     5   146991 use Moo;
  5         101493  
  5         29  
3 5     5   24294 use ZMQ::FFI::Constants qw/ :all /;
  0            
  0            
4             use AnyEvent;
5             use Scalar::Util qw/ weaken /;
6             use Try::Tiny qw/ try catch /;
7             use namespace::clean -except => 'meta';
8              
9             with qw/
10             Message::Passing::ZeroMQ::Role::HasASocket
11             Message::Passing::Role::Input
12             /;
13              
14             has '+_socket' => (
15             handles => {
16             _zmq_recv => 'recv',
17             },
18             );
19              
20             sub _socket_type { 'SUB' }
21              
22             has socket_hwm => (
23             is => 'rw',
24             default => 10000,
25             );
26              
27             has subscribe => (
28             isa => sub { ref($_[0]) eq 'ARRAY' },
29             is => 'ro',
30             lazy => 1,
31             default => sub { [ '' ] }, # Subscribe to everything!
32             );
33              
34             sub setsockopt {
35             my ($self, $socket) = @_;
36              
37             if ($self->zmq_major_version >= 3){
38             $socket->set(ZMQ_RCVHWM, 'int', $self->socket_hwm);
39             }
40             else {
41             $socket->set(ZMQ_HWM, 'uint64_t', $self->socket_hwm);
42             }
43              
44             if ($self->socket_type eq 'SUB') {
45             foreach my $sub (@{ $self->subscribe }) {
46             $socket->set(ZMQ_SUBSCRIBE, "binary", $sub);
47             }
48             }
49              
50             return;
51             }
52              
53             sub _try_rx {
54             my $self = shift();
55             my $msg;
56             try {
57             $msg = $self->_zmq_recv(ZMQ_NOBLOCK);
58             };
59             if ($msg) {
60             $self->output_to->consume($msg);
61             }
62             return $msg;
63             }
64              
65             has _io_reader => (
66             is => 'ro',
67             lazy => 1,
68             default => sub {
69             my $weak_self = shift;
70             weaken($weak_self);
71             AE::io $weak_self->_socket->get_fd, 0,
72             sub { my $more; do { $more = $weak_self->_try_rx } while ($more) };
73             },
74             );
75              
76             # Note that we need this timer as ZMQ is magic..
77             # Just checking our local FD for readability will not always
78             # be enough, as the client end of ZQM may not start pushing messages to us,
79             # ergo we call ->recv explicitly on the socket to get messages
80             # which may be pre-buffered at a client as fast as possible (i.e. before
81             # the client pushes another message).
82             has _zmq_timer => (
83             is => 'ro',
84             lazy => 1,
85             default => sub {
86             my $weak_self = shift;
87             weaken($weak_self);
88             AnyEvent->timer(after => 1, interval => 1,
89             cb => sub { my $more; do { $more = $weak_self->_try_rx } while ($more) });
90             },
91             );
92              
93             sub BUILD {
94             my $self = shift;
95             $self->_io_reader;
96             $self->_zmq_timer;
97             }
98              
99             1;
100              
101             =head1 NAME
102              
103             Message::Passing::Input::ZeroMQ - input messages from ZeroMQ.
104              
105             =head1 SYNOPSIS
106              
107             message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}'
108              
109             =head1 DESCRIPTION
110              
111             A L ZeroMQ input class.
112              
113             Can be used as part of a chain of classes with the L utility, or directly as
114             an input with L.
115              
116             =head1 ATTRIBUTES
117              
118             See L
119              
120             =head2 subscribe
121              
122             If the input socket is a C socket, then the C
123             socket option will be set once for each value in the subscribe attribute.
124              
125             Defaults to '', which means all messages are subscribed to.
126              
127             =head1 SEE ALSO
128              
129             =over
130              
131             =item L
132              
133             =item L
134              
135             =item L
136              
137             =item L
138              
139             =item L
140              
141             =back
142              
143             =head1 SPONSORSHIP
144              
145             This module exists due to the wonderful people at Suretec Systems Ltd.
146             who sponsored its development for its
147             VoIP division called SureVoIP for use with
148             the SureVoIP API -
149            
150              
151             =head1 AUTHOR, COPYRIGHT AND LICENSE
152              
153             See L.
154              
155             =cut
156