File Coverage

blib/lib/Message/Passing/Output/ZeroMQ.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Message::Passing::Output::ZeroMQ;
2 2     2   24138 use Moo;
  2         15378  
  2         16  
3 2     2   4199 use MooX::Types::MooseLike::Base qw/ :all /;
  2         14680  
  2         972  
4 2     2   825 use namespace::clean -except => 'meta';
  2         14302  
  2         54  
5              
6 2     2   2133 use ZMQ::FFI::Constants qw/ :all /;
  0            
  0            
7             use Time::HiRes;
8              
9             with qw/
10             Message::Passing::ZeroMQ::Role::HasASocket
11             Message::Passing::Role::Output
12             /;
13              
14             has '+_socket' => (
15             handles => {
16             '_zmq_send' => 'send',
17             },
18             );
19              
20             sub _socket_type { 'PUB' }
21              
22             has socket_hwm => (
23             is => 'rw',
24             default => 10000,
25             );
26              
27             has subscribe_delay => (
28             is => 'ro',
29             isa => Num,
30             default => 0.2,
31             );
32              
33             # socket_(probably)_subscribed, but who has the bytes for that
34             has socket_subscribed => (
35             is => 'rw',
36             isa => Bool,
37             );
38             has socket_connect_time => (
39             is => 'rw',
40             isa => Num,
41             );
42              
43             sub BUILD {
44             my $self = shift;
45             # Force a socket to be built, so that there's more chance the first message will be sent
46             if ($self->_should_connect){
47             my $socket = $self->_socket;
48             return;
49             }
50              
51             return;
52             }
53              
54             sub consume {
55             my ($self, $data) = @_;
56              
57             # See the slow joiner problem for PUB/SUB, outlined in
58             # http://zguide.zeromq.org/page:all#Getting-the-Message-Out
59             if (!$self->socket_subscribed && $self->socket_connect_time){
60             my $time = Time::HiRes::time;
61             my $alive_time = $time - $self->socket_connect_time;
62             my $sleep_time = sprintf "%.4f", ($self->subscribe_delay - $alive_time);
63             # warn "Alive $alive_time, so sleep time $sleep_time";
64             if ($sleep_time > 0){
65             Time::HiRes::sleep $sleep_time;
66             }
67             $self->socket_subscribed(1);
68             }
69              
70             return $self->_zmq_send($data);
71             }
72              
73             sub setsockopt {
74             my ($self, $socket) = @_;
75              
76             if ($self->zmq_major_version >= 3){
77             $socket->set(ZMQ_SNDHWM, 'int', $self->socket_hwm);
78             }
79             else {
80             $socket->set(ZMQ_HWM, 'uint64_t', $self->socket_hwm);
81             }
82              
83             return;
84             }
85              
86             after _build_socket => sub {
87             my $self = shift;
88             $self->socket_connect_time( Time::HiRes::time );
89             };
90              
91             1;
92              
93             =head1 NAME
94              
95             Message::Passing::Output::ZeroMQ - output messages to ZeroMQ.
96              
97             =head1 SYNOPSIS
98              
99             use Message::Passing::Output::ZeroMQ;
100              
101             my $logger = Message::Passing::Output::ZeroMQ->new;
102             $logger->consume({data => { some => 'data'}, '@metadata' => 'value' });
103              
104             # Or see Log::Dispatch::Message::Passing for a more 'normal' interface to
105             # simple logging.
106              
107             # Or use directly on command line:
108             message-passing --input STDIN --output ZeroMQ --output_options \
109             '{"connect":"tcp://192.168.0.1:5552"}'
110             {"data":{"some":"data"},"@metadata":"value"}
111              
112             =head1 DESCRIPTION
113              
114             A L ZeroMQ output class.
115              
116             Can be used as part of a chain of classes with the L utility, or directly as
117             a logger in normal perl applications.
118              
119             =head1 ATTRIBUTES
120              
121              
122             See L.
123              
124             =head2 subscribe_delay
125              
126             Time in floating seconds to sleep to ensure the receiving socket has subscribed.
127             This is the longest the sleep might take.
128              
129             See the slow-joiner problem: L.
130              
131             DEFAULT: 0.2 seconds
132              
133             =head1 METHODS
134              
135             =head2 consume ($msg)
136              
137             Sends a message, as-is. This means that you must have encoded the message to a string before
138             sending it. The C utility will do this for you into JSON, or you can
139             do it manually as shown in the example in L.
140              
141             =head1 SEE ALSO
142              
143             =over
144              
145             =item L
146              
147             =item L
148              
149             =item L
150              
151             =item L
152              
153             =item L
154              
155             =back
156              
157             =head1 SPONSORSHIP
158              
159             This module exists due to the wonderful people at Suretec Systems Ltd.
160             who sponsored its development for its
161             VoIP division called SureVoIP for use with
162             the SureVoIP API -
163            
164              
165             =head1 AUTHOR, COPYRIGHT AND LICENSE
166              
167             See L.
168              
169             =cut
170