File Coverage

blib/lib/Message/Passing/ZeroMQ.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Message::Passing::ZeroMQ;
2 1     1   1185 use strict;
  1         3  
  1         52  
3 1     1   7 use warnings;
  1         2  
  1         44  
4 1     1   1852 use ZeroMQ qw/ :all /;
  0            
  0            
5             use POSIX::AtFork ();
6             use Sub::Name;
7             use namespace::clean -except => 'meta';
8              
9             our $VERSION = "0.007";
10             $VERSION = eval $VERSION;
11              
12             our @_WITH_CONTEXTS;
13              
14             POSIX::AtFork->add_to_prepare(subname at_fork => sub {
15             foreach my $thing (grep { defined $_ } @_WITH_CONTEXTS) {
16             $thing->_clear_ctx;
17             }
18             @_WITH_CONTEXTS = ();
19             });
20              
21             1;
22              
23             =head1 NAME
24              
25             Message::Passing::ZeroMQ - input and output messages to ZeroMQ.
26              
27             =head1 SYNOPSIS
28              
29             # Terminal 1:
30             $ message-passing --input STDIN --output ZeroMQ --output_options '{"connect":"tcp://127.0.0.1:5552"}'
31             {"data":{"some":"data"},"@metadata":"value"}
32              
33             # Terminal 2:
34             $ message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}'
35             {"data":{"some":"data"},"@metadata":"value"}
36              
37             =head1 DESCRIPTION
38              
39             A L transport for L.
40              
41             Designed for use as a log transport and aggregation mechanism for perl applications, allowing you
42             to aggregate structured and non-structured log messages across the network in a non-blocking manor.
43              
44             Clients (I.e. users of the L class) connect to a server (I.e. a user of the
45             L class) via ZeroMQ's pub/sub sockets. These are setup to be lossy and non-blocking,
46             meaning that if the log-receiver process is down or slow, then the application will queue a small (and configurable)
47             amount of logs on it's side, and after that log messages will be dropped.
48              
49             Whilst throwing away log messages isn't a good thing to do, or something that you want to happen regularly,
50             in many (especially web application) contexts, network logging being a single point of failure is
51             not acceptable from a reliability and graceful degradation standpoint.
52              
53             The application grinding to a halt as a non-essential centralised resource is unavailable (e.g. the log aggregation
54             server) is significantly less acceptable than the loss of non-essential logging data.
55              
56             =head1 HOW TO USE
57              
58             In your application emitting messages, you can either use L directly,
59             or you can use it via L.
60              
61             use Log::Dispatch;
62             use Log::Dispatch::Message::Passing;
63             use Message::Passing::Output::ZeroMQ;
64             use Message::Passing::Filter::Encode::JSON;
65              
66             my $log = Log::Dispatch->new;
67              
68             $log->add(Log::Dispatch::Message::Passing->new(
69             name => 'myapp_aggregate_log',
70             min_level => 'debug',
71             output => Message::Passing::Filter::Encode::JSON->new(
72             output_to => Message::Passing::Output::ZeroMQ->new(
73             connect => 'tcp://192.168.0.1:5558',
74             )
75             ),
76             ));
77              
78             $log->warn($_) for qw/ foo bar baz /;
79              
80             On your log aggregation server, just run the message-passing utility:
81              
82             message-passing --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5222"}' \
83             --output File --output_options '{"filename":"/tmp/my_test.log"}'
84              
85             =head1 SOCKET TYPES
86              
87             ZeroMQ supports multiple socket types, the only ones used in Message::Passing::ZeroMQ are:
88              
89             =head2 PUB/SUB
90              
91             Used for general message distribution - you can have either multiple producers (PUB)
92             which connect to one consumer (SUB), or multiple consumers (SUB) which connect to one
93             producer (PUB).
94              
95             All consumers will get a copy of every message.
96              
97             In Message::Passing terms, L is for SUB sockets, and
98             L is for PUB sockets.
99              
100             =head2 PUSH/PULL
101              
102             Used for message distribution. A sever (PUSH) distributes messages between
103             a number of connecting clients (PULL)
104              
105             In Message::Passing terms, L is for PULL sockets, and
106             L is for PUSH sockets.
107              
108             =head1 CONNECTION DIRECTION
109              
110             Note that in ZeroMQ, the connection direction and the direction of message flow can be
111             entirely opposite. I.e. a client can connect to a server and send messages to it, or
112             receive messages from it (depending on the direction of the socket types).
113              
114             =head1 CONNECTION ATTRIBUTES
115              
116             Both L and L support
117             either binding a server or connecting to a remote host, due to the fact that ZeroMQ connections
118             can be in any direction, as noted above.
119              
120             Therefore, each input or output should have one (but not both!) of the following attributes:
121              
122             =head2 connect
123              
124             Connects to a remote server, e.g. C<< tcp://192.168.0.1:5222 >>
125              
126             =head2 socket_bind
127              
128             Binds a server and waits for connections from clients, e.g. C<< tcp://*:5222 >>
129              
130             =head2 socket_type
131              
132             This defaults to C for L and C for
133             L, however you can override it to C/C as
134             appropriate for your use case if desired.
135              
136             =head1 MORE COMPLEX EXAMPLES
137              
138             With this in mind, we can easily create a system which aggregates messages from
139             multiple publishers, and passes them out (in a round-robin fashion) to a pool of workers.
140              
141             # The message distributor:
142             message-passing --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5222"}' \
143             --output ZeroMQ --output_options '{"socket_bind":"tcp://*:5223","socket_type":"PUSH"}'
144              
145             # Workers
146             {
147             package MyApp::MessageWorker;
148             use Moo;
149              
150             with 'Message::Passing::Role::Filter';
151              
152             sub filter {
153             my ($self, $message) = @_;
154             # .... process the message in any way you want here
155             return undef; # Do not output the message..
156             }
157             }
158              
159             message-passing --input ZeroMQ --input_options '{"connect":"tcp://127.0.0.1:5223","socket_type":"PULL"}'
160             --filter '+MyApp::MessageWorker'
161             --output STDOUT
162              
163             You log messages into the distributor as per the above simple example, and you can run multiple worker
164             processes..
165              
166             Less trivial setups could/would emit messages on error, or maybe re-emit the incoming message after transforming it
167             in some way.
168              
169             =head1 SEE ALSO
170              
171             For more detailed information about ZeroMQ and how it works, please consult the ZeroMQ guide and the other links below:
172              
173             =over
174              
175             =item L
176              
177             =item L
178              
179             =item L
180              
181             =item L
182              
183             =item L
184              
185             =item L
186              
187             =back
188              
189             =head1 AUTHOR
190              
191             Tomas (t0m) Doran
192              
193             =head1 SPONSORSHIP
194              
195             This module exists due to the wonderful people at Suretec Systems Ltd.
196             who sponsored its development for its
197             VoIP division called SureVoIP for use with
198             the SureVoIP API -
199            
200              
201             =head1 COPYRIGHT
202              
203             Copyright Suretec Systems 2012.
204              
205             =head1 LICENSE
206              
207             GNU Affero General Public License, Version 3
208              
209             If you feel this is too restrictive to be able to use this software,
210             please talk to us as we'd be willing to consider re-licensing under
211             less restrictive terms.
212              
213             =cut
214              
215             1;
216