File Coverage

blib/lib/Message/Passing/Output/ZeroMQ.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Message::Passing::Output::ZeroMQ;
2 2     2   63682 use Moo;
  2         10569  
  2         11  
3 2     2   3080 use MooX::Types::MooseLike::Base qw/ :all /;
  2         13250  
  2         924  
4 2     2   586 use namespace::clean -except => 'meta';
  2         10745  
  2         19  
5              
6 2     2   1808 use ZMQ::FFI::Constants qw/ :all /;
  0            
  0            
7             use Time::HiRes;
8              
9             with qw/
10             Message::Passing::ZeroMQ::Role::HasASocket
11             Message::Passing::Role::Output
12             /;
13              
14             has '+_socket' => (
15             handles => {
16             '_zmq_send' => 'send',
17             },
18             );
19              
20             sub _socket_type { 'PUB' }
21              
22             has socket_hwm => (
23             is => 'rw',
24             default => 10000,
25             );
26              
27             has subscribe_delay => (
28             is => 'ro',
29             isa => Num,
30             default => 0.2,
31             );
32              
33             # socket_(probably)_subscribed, but who has the bytes for that
34             has socket_subscribed => (
35             is => 'rw',
36             isa => Bool,
37             );
38             has socket_connect_time => (
39             is => 'rw',
40             isa => Num,
41             );
42              
43             sub BUILD {
44             my $self = shift;
45             # Force a socket to be built, so that there's more chance the first message will be sent
46             if ($self->_should_connect){
47             my $socket = $self->_socket;
48             return;
49             }
50              
51             return;
52             }
53              
54             sub consume {
55             my ($self, $data) = @_;
56              
57             # See the slow joiner problem for PUB/SUB, outlined in
58             # http://zguide.zeromq.org/page:all#Getting-the-Message-Out
59             if (!$self->socket_subscribed && $self->socket_connect_time){
60             my $time = Time::HiRes::time;
61             my $alive_time = $time - $self->socket_connect_time;
62             my $sleep_time = sprintf "%.4f", ($self->subscribe_delay - $alive_time);
63             # warn "Alive $alive_time, so sleep time $sleep_time";
64             if ($sleep_time > 0){
65             Time::HiRes::sleep $sleep_time;
66             }
67             $self->socket_subscribed(1);
68             }
69              
70             return $self->_zmq_send($data);
71             }
72              
73             sub setsockopt {
74             my ($self, $socket) = @_;
75              
76             if ($self->zmq_major_version >= 3){
77             $socket->set(ZMQ_SNDHWM, 'int', $self->socket_hwm);
78             }
79             else {
80             $socket->set(ZMQ_HWM, 'uint64_t', $self->socket_hwm);
81             }
82              
83             return;
84             }
85              
86             after _build_socket => sub {
87             my $self = shift;
88             $self->socket_connect_time( Time::HiRes::time );
89             };
90              
91             1;
92              
93             =head1 NAME
94              
95             Message::Passing::Output::ZeroMQ - output messages to ZeroMQ.
96              
97             =head1 SYNOPSIS
98              
99             use Message::Passing::Output::ZeroMQ;
100              
101             my $logger = Message::Passing::Output::ZeroMQ->new;
102             $logger->consume({data => { some => 'data'}, '@metadata' => 'value' });
103              
104             # Or see Log::Dispatch::Message::Passing for a more 'normal' interface to
105             # simple logging.
106              
107             # Or use directly on command line:
108             message-passing --input STDIN --output ZeroMQ --output_options \
109             '{"connect":"tcp://192.168.0.1:5552"}'
110             {"data":{"some":"data"},"@metadata":"value"}
111              
112             =head1 DESCRIPTION
113              
114             A L<Message::Passing> ZeroMQ output class.
115              
116             Can be used as part of a chain of classes with the L<message-passing> utility, or directly as
117             a logger in normal perl applications.
118              
119             =head1 ATTRIBUTES
120              
121              
122             See L<Message::Passing::ZeroMQ/CONNECTION ATTRIBUTES>.
123              
124             =head2 subscribe_delay
125              
126             Time in floating seconds to sleep to ensure the receiving socket has subscribed.
127             This is the longest the sleep might take.
128              
129             See the slow-joiner problem: L<http://zguide.zeromq.org/page:all#Getting-the-Message-Out>.
130              
131             DEFAULT: 0.2 seconds
132              
133             =head1 METHODS
134              
135             =head2 consume ($msg)
136              
137             Sends a message, as-is. This means that you must have encoded the message to a string before
138             sending it. The C<message-pass> utility will do this for you into JSON, or you can
139             do it manually as shown in the example in L<Message::Passing::ZeroMQ>.
140              
141             =head1 SEE ALSO
142              
143             =over
144              
145             =item L<Message::Passing::ZeroMQ>
146              
147             =item L<Message::Passing::Input::ZeroMQ>
148              
149             =item L<Message::Passing>
150              
151             =item L<ZeroMQ>
152              
153             =item L<http://www.zeromq.org/>
154              
155             =back
156              
157             =head1 SPONSORSHIP
158              
159             This module exists due to the wonderful people at Suretec Systems Ltd.
160             <http://www.suretecsystems.com/> who sponsored its development for its
161             VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
162             the SureVoIP API -
163             <http://www.surevoip.co.uk/support/wiki/api_documentation>
164              
165             =head1 AUTHOR, COPYRIGHT AND LICENSE
166              
167             See L<Message::Passing>.
168              
169             =cut
170