File Coverage

blib/lib/Message/Passing/ZeroMQ.pm
Criterion Covered Total %
statement 15 15 100.0
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 20 20 100.0


line stmt bran cond sub pod time code
1             package Message::Passing::ZeroMQ;
2 1     1   656 use strict;
  1         2  
  1         24  
3 1     1   5 use warnings;
  1         2  
  1         27  
4 1     1   688 use POSIX::AtFork ();
  1         530  
  1         23  
5 1     1   697 use Sub::Name;
  1         492  
  1         61  
6 1     1   829 use namespace::clean -except => 'meta';
  1         36081  
  1         7  
7              
8             our $VERSION = "0.008";
9             $VERSION = eval $VERSION;
10              
11             our @_WITH_CONTEXTS;
12              
13             POSIX::AtFork->add_to_prepare(subname at_fork => sub {
14             foreach my $thing (grep { defined $_ } @_WITH_CONTEXTS) {
15             $thing->_clear_ctx;
16             }
17             @_WITH_CONTEXTS = ();
18             });
19              
20             1;
21              
22             =head1 NAME
23              
24             Message::Passing::ZeroMQ - input and output messages to ZeroMQ.
25              
26             =head1 SYNOPSIS
27              
28             # Terminal 1:
29             $ message-passing --input STDIN --output ZeroMQ --output_options '{"connect":"tcp://127.0.0.1:5552"}'
30             {"data":{"some":"data"},"@metadata":"value"}
31              
32             # Terminal 2:
33             $ message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}'
34             {"data":{"some":"data"},"@metadata":"value"}
35              
36             =head1 DESCRIPTION
37              
38             A L transport for L.
39              
40             Designed for use as a log transport and aggregation mechanism for perl applications, allowing you
41             to aggregate structured and non-structured log messages across the network in a non-blocking manor.
42              
43             Clients (I.e. users of the L class) connect to a server (I.e. a user of the
44             L class) via ZeroMQ's pub/sub sockets. These are setup to be lossy and non-blocking,
45             meaning that if the log-receiver process is down or slow, then the application will queue a small (and configurable)
46             amount of logs on it's side, and after that log messages will be dropped.
47              
48             Whilst throwing away log messages isn't a good thing to do, or something that you want to happen regularly,
49             in many (especially web application) contexts, network logging being a single point of failure is
50             not acceptable from a reliability and graceful degradation standpoint.
51              
52             The application grinding to a halt as a non-essential centralised resource is unavailable (e.g. the log aggregation
53             server) is significantly less acceptable than the loss of non-essential logging data.
54              
55             =head1 HOW TO USE
56              
57             In your application emitting messages, you can either use L directly,
58             or you can use it via L.
59              
60             use Log::Dispatch;
61             use Log::Dispatch::Message::Passing;
62             use Message::Passing::Output::ZeroMQ;
63             use Message::Passing::Filter::Encode::JSON;
64              
65             my $log = Log::Dispatch->new;
66              
67             $log->add(Log::Dispatch::Message::Passing->new(
68             name => 'myapp_aggregate_log',
69             min_level => 'debug',
70             output => Message::Passing::Filter::Encode::JSON->new(
71             output_to => Message::Passing::Output::ZeroMQ->new(
72             connect => 'tcp://192.168.0.1:5558',
73             )
74             ),
75             ));
76              
77             $log->warn($_) for qw/ foo bar baz /;
78              
79             On your log aggregation server, just run the message-passing utility:
80              
81             message-passing --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5222"}' \
82             --output File --output_options '{"filename":"/tmp/my_test.log"}'
83              
84             =head1 SOCKET TYPES
85              
86             ZeroMQ supports multiple socket types, the only ones used in Message::Passing::ZeroMQ are:
87              
88             =head2 PUB/SUB
89              
90             Used for general message distribution - you can have either multiple producers (PUB)
91             which connect to one consumer (SUB), or multiple consumers (SUB) which connect to one
92             producer (PUB).
93              
94             All consumers will get a copy of every message.
95              
96             In Message::Passing terms, L is for SUB sockets, and
97             L is for PUB sockets.
98              
99             =head2 PUSH/PULL
100              
101             Used for message distribution. A sever (PUSH) distributes messages between
102             a number of connecting clients (PULL)
103              
104             In Message::Passing terms, L is for PULL sockets, and
105             L is for PUSH sockets.
106              
107             =head1 CONNECTION DIRECTION
108              
109             Note that in ZeroMQ, the connection direction and the direction of message flow can be
110             entirely opposite. I.e. a client can connect to a server and send messages to it, or
111             receive messages from it (depending on the direction of the socket types).
112              
113             =head1 CONNECTION ATTRIBUTES
114              
115             Both L and L support
116             either binding a server or connecting to a remote host, due to the fact that ZeroMQ connections
117             can be in any direction, as noted above.
118              
119             Therefore, each input or output should have one (but not both!) of the following attributes:
120              
121             =head2 connect
122              
123             Connects to a remote server, e.g. C<< tcp://192.168.0.1:5222 >>
124              
125             =head2 socket_bind
126              
127             Binds a server and waits for connections from clients, e.g. C<< tcp://*:5222 >>
128              
129             =head2 socket_type
130              
131             This defaults to C for L and C for
132             L, however you can override it to C/C as
133             appropriate for your use case if desired.
134              
135             =head1 MORE COMPLEX EXAMPLES
136              
137             With this in mind, we can easily create a system which aggregates messages from
138             multiple publishers, and passes them out (in a round-robin fashion) to a pool of workers.
139              
140             # The message distributor:
141             message-passing --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5222"}' \
142             --output ZeroMQ --output_options '{"socket_bind":"tcp://*:5223","socket_type":"PUSH"}'
143              
144             # Workers
145             {
146             package MyApp::MessageWorker;
147             use Moo;
148              
149             with 'Message::Passing::Role::Filter';
150              
151             sub filter {
152             my ($self, $message) = @_;
153             # .... process the message in any way you want here
154             return undef; # Do not output the message..
155             }
156             }
157              
158             message-passing --input ZeroMQ --input_options '{"connect":"tcp://127.0.0.1:5223","socket_type":"PULL"}'
159             --filter '+MyApp::MessageWorker'
160             --output STDOUT
161              
162             You log messages into the distributor as per the above simple example, and you can run multiple worker
163             processes..
164              
165             Less trivial setups could/would emit messages on error, or maybe re-emit the incoming message after transforming it
166             in some way.
167              
168             =head1 SEE ALSO
169              
170             For more detailed information about ZeroMQ and how it works, please consult the ZeroMQ guide and the other links below:
171              
172             =over
173              
174             =item L
175              
176             =item L
177              
178             =item L
179              
180             =item L
181              
182             =item L
183              
184             =item L
185              
186             =back
187              
188             =head1 AUTHOR
189              
190             Tomas (t0m) Doran
191              
192             =head1 SPONSORSHIP
193              
194             This module exists due to the wonderful people at Suretec Systems Ltd.
195             who sponsored its development for its
196             VoIP division called SureVoIP for use with
197             the SureVoIP API -
198            
199              
200             =head1 COPYRIGHT
201              
202             Copyright Suretec Systems 2012.
203              
204             =head1 LICENSE
205              
206             GNU Affero General Public License, Version 3
207              
208             If you feel this is too restrictive to be able to use this software,
209             please talk to us as we'd be willing to consider re-licensing under
210             less restrictive terms.
211              
212             =cut
213              
214             1;
215