File Coverage

blib/lib/Message/Passing/Input/ZeroMQ.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package Message::Passing::Input::ZeroMQ;
2 5     5   190215 use Moo;
  5         115065  
  5         36  
3 5     5   19622 use ZeroMQ qw/:all/;
  0            
  0            
4             use AnyEvent;
5             use Scalar::Util qw/ weaken /;
6             use Try::Tiny qw/ try catch /;
7             use namespace::clean -except => 'meta';
8              
9             with qw/
10             Message::Passing::ZeroMQ::Role::HasASocket
11             Message::Passing::Role::Input
12             /;
13              
14             has '+_socket' => (
15             handles => {
16             _zmq_recv => 'recv',
17             },
18             );
19              
20             sub _socket_type { 'SUB' }
21              
22             sub _build_socket_hwm { 100000 }
23             sub _build_socket_swap { 0 }
24              
25             has subscribe => (
26             isa => sub { ref($_[0]) eq 'ARRAY' },
27             is => 'ro',
28             lazy => 1,
29             default => sub { [ '' ] }, # Subscribe to everything!
30             );
31              
32             after setsockopt => sub {
33             my ($self, $socket) = @_;
34             if ($self->socket_type eq 'SUB') {
35             foreach my $sub (@{ $self->subscribe }) {
36             $socket->setsockopt(ZMQ_SUBSCRIBE, $sub);
37             }
38             }
39             };
40              
41             sub _try_rx {
42             my $self = shift();
43             my $msg = $self->_zmq_recv(ZMQ_NOBLOCK);
44             if ($msg) {
45             $self->output_to->consume($msg->data);
46             }
47             return $msg;
48             }
49              
50             has _io_reader => (
51             is => 'ro',
52             lazy => 1,
53             default => sub {
54             my $weak_self = shift;
55             weaken($weak_self);
56             AE::io $weak_self->_socket->getsockopt( ZMQ_FD ), 0,
57             sub { my $more; do { $more = $weak_self->_try_rx } while ($more) };
58             },
59             );
60              
61             # Note that we need this timer as ZMQ is magic..
62             # Just checking our local FD for readability will not always
63             # be enough, as the client end of ZQM may not start pushing messages to us,
64             # ergo we call ->recv explicitly on the socket to get messages
65             # which may be pre-buffered at a client as fast as possible (i.e. before
66             # the client pushes another message).
67             has _zmq_timer => (
68             is => 'ro',
69             lazy => 1,
70             default => sub {
71             my $weak_self = shift;
72             weaken($weak_self);
73             AnyEvent->timer(after => 1, interval => 1,
74             cb => sub { my $more; do { $more = $weak_self->_try_rx } while ($more) });
75             },
76             );
77              
78             sub BUILD {
79             my $self = shift;
80             $self->_io_reader;
81             $self->_zmq_timer;
82             }
83              
84             1;
85              
86             =head1 NAME
87              
88             Message::Passing::Input::ZeroMQ - input messages from ZeroMQ.
89              
90             =head1 SYNOPSIS
91              
92             message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}'
93              
94             =head1 DESCRIPTION
95              
96             A L ZeroMQ input class.
97              
98             Can be used as part of a chain of classes with the L utility, or directly as
99             an input with L.
100              
101             =head1 ATTRIBUTES
102              
103             See L
104              
105             =head2 subscribe
106              
107             If the input socket is a C socket, then the C
108             socket option will be set once for each value in the subscribe attribute.
109              
110             Defaults to '', which means all messages are subscribed to.
111              
112             =head1 SEE ALSO
113              
114             =over
115              
116             =item L
117              
118             =item L
119              
120             =item L
121              
122             =item L
123              
124             =item L
125              
126             =back
127              
128             =head1 SPONSORSHIP
129              
130             This module exists due to the wonderful people at Suretec Systems Ltd.
131             who sponsored its development for its
132             VoIP division called SureVoIP for use with
133             the SureVoIP API -
134            
135              
136             =head1 AUTHOR, COPYRIGHT AND LICENSE
137              
138             See L.
139              
140             =cut
141