File Coverage

blib/lib/Message/Passing.pm
Criterion Covered Total %
statement 42 42 100.0
branch n/a
condition n/a
subroutine 11 11 100.0
pod 1 2 50.0
total 54 55 98.1


line stmt bran cond sub pod time code
1             package Message::Passing;
2 3     3   4755 use Moo;
  3         23432  
  3         21  
3 3     3   4757 use Config::Any;
  3         28405  
  3         103  
4 3     3   1445 use Message::Passing::Role::CLIComponent;
  3         10  
  3         23  
5 3     3   1540 use Message::Passing::DSL;
  3         11  
  3         263  
6 3     3   23 use Carp qw/ confess /;
  3         12  
  3         190  
7 3     3   22 use MooX::Options flavour => [qw( pass_through )], protect_argv => 0;
  3         6  
  3         37  
8 3     3   244463 use namespace::clean -except => [qw/ meta new_with_options parse_options _options_data _options_config/];
  3         9  
  3         45  
9 3     3   1845 use 5.008004;
  3         11  
10              
11             our $VERSION = '0.117';
12             $VERSION = eval $VERSION;
13              
14             around 'parse_options' => sub {
15             my $orig = shift;
16             my $class = shift;
17             my %args = $orig->($class, @_);
18              
19             if (my $conf = $args{configfile}) {
20             my $cfg = $class->get_config_from_file($conf);
21             foreach my $k (keys %$cfg) {
22             if (!exists $args{$k}) {
23             $args{$k} = $cfg->{$k};
24             }
25             }
26             }
27              
28             return %args;
29             };
30              
31              
32             with
33             CLIComponent( name => 'input' ),
34             CLIComponent( name => 'output' ),
35             CLIComponent( name => 'filter', default => 'Null' ),
36             CLIComponent( name => 'decoder', default => 'JSON' ),
37             CLIComponent( name => 'encoder', default => 'JSON' ),
38             CLIComponent( name => 'error', default => 'STDERR' ),
39             CLIComponent( name => 'error_encoder', default => 'Message::Passing::Filter::Encoder::JSON' ),
40             'Message::Passing::Role::Script';
41              
42             option configfile => (
43             is => 'ro',
44             format => 's',
45             );
46              
47             sub get_config_from_file {
48 1     1 0 3 my ($class, $filename) = @_;
49 1         1 my ($fn, $cfg) = %{ Config::Any->load_files({
  1         11  
50             files => [$filename],
51             use_ext => 1,
52             })->[0] };
53 1         11580 return $cfg;
54             }
55              
56             sub build_chain {
57 1     1 1 2506 my $self = shift;
58             message_chain {
59             error_log(
60 1         9 %{ $self->error_encoder_options },
61             class => $self->error_encoder,
62             output_to => output error => (
63 1     1   3 %{ $self->error_options },
  1         8  
64             class => $self->error,
65             ),
66             );
67             output output => (
68 1         49 %{ $self->output_options },
  1         14  
69             class => $self->output,
70             );
71             encoder("encoder",
72 1         4 %{ $self->encoder_options },
  1         14  
73             class => $self->encoder,
74             output_to => 'output',
75             );
76             filter filter => (
77 1         3 %{ $self->filter_options },
  1         12  
78             class => $self->filter,
79             output_to => 'encoder',
80             );
81             decoder("decoder",
82 1         3 %{ $self->decoder_options },
  1         11  
83             class => $self->decoder,
84             output_to => 'filter',
85             );
86             input input => (
87 1         3 %{ $self->input_options },
  1         14  
88             class => $self->input,
89             output_to => 'decoder',
90             );
91 1         11 };
92             }
93              
94             1;
95              
96             =head1 NAME
97              
98             Message::Passing - a simple way of doing messaging.
99              
100             =head1 SYNOPSIS
101              
102             message-pass --input STDIN --output STDOUT
103             {"foo": "bar"}
104             {"foo":"bar"}
105              
106             =head1 DESCRIPTION
107              
108             A library for building high performance, loosely coupled and reliable/resilient applications,
109             structured as small services which communicate over the network by passing messages.
110              
111             =head2 BASIC PREMISE
112              
113             You have data for discrete events, represented by a hash (and
114             serialized as JSON).
115              
116             This could be a text log line, an audit record of an API
117             event, a metric emitted from your application that you wish
118             to aggregate and process - anything that can be a simple hash really..
119              
120             You want to be able to shove these events over the network easily,
121             and aggregate them / filter and rewrite them / split them into worker queues.
122              
123             This module is designed as a simple framework for writing components
124             that let you do all of these things, in a simple and easily extensible
125             manor.
126              
127             For a practical example, You generate events from a source (e.g.
128             L<ZeroMQ> output of logs and performance metrics from your L<Catalyst> FCGI
129             or L<Starman> workers) and run one script that will give you a central
130             application log file, or push the logs into Elasticsearch.
131              
132             There are a growing set of components you can plug together
133             to make your solution.
134              
135             Getting started is really easy - you can just use the C<message-pass>
136             command installed by the distribution. If you have a common config
137             that you want to repeat, or you want to write your own server
138             which does something more flexible than the normal script allows,
139             then see L<Message::Passing::DSL>.
140              
141             To dive straight in, see the documentation for the command line utility
142             L<message-pass>, and see the examples in L<Message::Passing::Manual::Cookbook>.
143              
144             For more about how the system works, see L<Message::Passing::Manual::Concepts>.
145              
146             =head1 COMPONENTS
147              
148             Below is a non-exhaustive list of components available.
149              
150             =head2 INPUTS
151              
152             Inputs receive data from a source (usually a network protocol).
153              
154             They are responsible for decoding the data into a hash before passing
155             it onto the next stage.
156              
157             Inputs include:
158              
159             =over
160              
161             =item L<Message::Passing::Input::STDIN>
162              
163             =item L<Message::Passing::Input::ZeroMQ>
164              
165             =item L<Message::Passing::Input::STOMP>
166              
167             =item L<Message::Passing::Input::AMQP>
168              
169             =item L<Message::Passing::Input::Syslog>
170              
171             =item L<Message::Passing::Input::Redis>
172              
173             =item L<Message::Passing::Input::Test>
174              
175             =back
176              
177             You can easily write your own input, just use L<AnyEvent>, and
178             consume L<Message::Passing::Role::Input>.
179              
180             =head2 FILTER
181              
182             Filters can transform a message in any way.
183              
184             Examples include:
185              
186             =over
187              
188             =item L<Message::Passing::Filter::Null> - Returns the input unchanged.
189              
190             =item L<Message::Passing::Filter::All> - Stops any messages it receives from being passed to the output. I.e. literally filters all input out.
191              
192             =item L<Message::Passing::Filter::T> - Splits the incoming message to multiple outputs.
193              
194             =back
195              
196             You can easily write your own filter, just consume
197             L<Message::Passing::Role::Filter>.
198              
199             Note that filters can be chained, and a filter can return undef to
200             stop a message being passed to the output.
201              
202             =head2 OUTPUTS
203              
204             Outputs send data to somewhere, i.e. they consume messages.
205              
206             =over
207              
208             =item L<Message::Passing::Output::STDOUT>
209              
210             =item L<Message::Passing::Output::AMQP>
211              
212             =item L<Message::Passing::Output::STOMP>
213              
214             =item L<Message::Passing::Output::ZeroMQ>
215              
216             =item L<Message::Passing::Output::WebHooks>
217              
218             =item L<Message::Passing::Output::Search::Elasticsearch>
219              
220             =item L<Message::Passing::Output::Redis>
221              
222             =item L<Message::Passing::Output::Test>
223              
224             =back
225              
226             =head1 SEE ALSO
227              
228             =over
229              
230             =item L<Message::Passing::Manual> - The manual (contributions cherished).
231              
232             =item L<http://www.slideshare.net/bobtfish/messaging-interoperability-and-log-aggregation-a-new-framework> - Slide deck!
233              
234             =item L<Log::Message::Structured> - For creating your log messages.
235              
236             =item L<Log::Dispatch::Message::Passing> - use Message::Passing outputs from L<Log::Dispatch>.
237              
238             =back
239              
240             =head1 THIS MODULE
241              
242             This is a simple L<MooX::Options> script, with one input, one filter
243             and one output. To build your own similar scripts, see:
244              
245             =over
246              
247             =item L<Message::Passing::DSL> - To declare your message chains.
248              
249             =item L<Message::Passing::Role::CLIComponent> - To provide C<foo> and C<foo_options> attribute pairs.
250              
251             =item L<Message::Passing::Role::Script> - To provide daemonization features.
252              
253             =back
254              
255             =head2 METHODS
256              
257             =head3 build_chain
258              
259             Builds and returns the configured chain of input => filter => output.
260              
261             =head3 start
262              
263             Class method to call the run_message_server function with the results of
264             having constructed an instance of this class, parsed command line options
265             and constructed a chain.
266              
267             This is the entry point for the script.
268              
269             =head1 AUTHOR
270              
271             Tomas (t0m) Doran <bobtfish@bobtfish.net>
272              
273             =head1 SUPPORT
274              
275             =head2 Bugs
276              
277             Please log bugs at L<rt.cpan.org>. Each distribution has a bug tracker
278             link in its L<metacpan.org> page.
279              
280             =head2 Discussion
281              
282             L<#message-passing> on L<irc.perl.org>.
283              
284             =head2 Source code
285              
286             Source code for all modules is available at L<http://github.com/suretec>
287             and forks/patches are very welcome.
288              
289             =head1 SPONSORSHIP
290              
291             This module exists due to the wonderful people at Suretec Systems Ltd.
292             <http://www.suretecsystems.com/> who sponsored its development for its
293             VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
294             the SureVoIP API -
295             <http://www.surevoip.co.uk/support/wiki/api_documentation>
296              
297             =head1 COPYRIGHT
298              
299             Copyright Suretec Systems Ltd. 2012.
300              
301             Logstash (upon which many ideas for this project are based, but
302             from which we do not reuse any code) is copyright 2010 Jorden Sissel.
303              
304             =head1 LICENSE
305              
306             GNU Library General Public License, Version 2.1
307              
308             =cut
309