File Coverage

blib/lib/Message/Passing/Output/AMQP.pm
Criterion Covered Total %
statement 6 18 33.3
branch 0 8 0.0
condition 0 3 0.0
subroutine 2 3 66.6
pod n/a
total 8 32 25.0


line stmt bran cond sub pod time code
1             package Message::Passing::Output::AMQP;
2 1     1   2285 use Moose;
  1         2  
  1         8  
3 1     1   6963 use namespace::autoclean;
  1         2  
  1         11  
4              
5             with qw/
6             Message::Passing::AMQP::Role::DeclaresExchange
7             Message::Passing::Role::Output
8             /;
9              
10             has routing_key => (
11             isa => 'Str',
12             is => 'ro',
13             default => '',
14             );
15              
16             has header_cb => (
17             isa => 'CodeRef',
18             is => 'ro',
19             );
20              
21             has serialize_cb => (
22             isa => 'CodeRef',
23             is => 'ro',
24             );
25              
26             sub consume {
27 0     0     my $self = shift;
28 0           my $data = shift;
29 0 0 0       if (ref $data && ! defined $self->serialize_cb) {
30 0           warn("Passed non-serialized data - is a perl reference. Dropping.\n");
31 0           return;
32             }
33 0 0         unless ($self->_exchange) {
34 0           warn("No exchange yet, dropping message");
35 0           return;
36             }
37 0           my $header;
38 0 0         $header = $self->header_cb->($data)
39             if defined $self->header_cb;
40              
41 0 0         $data = $self->serialize_cb->($data)
42             if defined $self->serialize_cb;
43              
44 0           $self->_channel->publish(
45             body => $data,
46             header => $header,
47             exchange => $self->exchange_name,
48             routing_key => $self->routing_key,
49             );
50             }
51              
52             __PACKAGE__->meta->make_immutable;
53             1;
54              
55             =head1 NAME
56              
57             Message::Passing::Output::AMQP - output messages to AMQP.
58              
59             =head1 SYNOPSIS
60              
61             message-pass --input STDIN --output AMQP --output_options '{"exchange_name":"test","hostname":"127.0.0.1","username":"guest","password":"guest"}'
62              
63             =head1 DESCRIPTION
64              
65             A L<Message::Passing> L<AnyEvent::RabbitMQ> output class.
66              
67             Can be used as part of a chain of classes with the L<message-pass> utility, or directly as
68             a logger in normal perl applications.
69              
70             =head1 ATTRIBUTES
71              
72             =head2 routing_key
73              
74             The routing key for all messages, defaults to ''.
75              
76             =head2 header_cb
77              
78             Optional callback function which gets passed the message before it is
79             serialized using L</serialize_cb>.
80             Should return a hashref which gets passed to publish( header => ).
81              
82             NOTE: if you want to set the message headers (note the s) you have to pass them inside headers, e.g.:
83              
84             {
85             content_type => 'application/json',
86             headers => {
87             key => 'value',
88             }
89             }
90              
91             =head2 serialize_cb
92              
93             Optional callback function which gets passed the message and should return a
94             scalar. This is useful when passing structured messages e.g. hashrefs or
95             objects where some attributes should be accessible for the L</header_cb> function.
96             If the serialization happens before using a L<Message::Passing::Role::Filter>
97             it would require to deserialize it again in header_cb.
98             To use a Message::Passing filter you can instantiate it and pass it's filter
99             function to serialize_cb:
100              
101             my $filter = Message::Passing::Filter::Encoder::JSON->new(output_to => undef);
102              
103             ...
104              
105             {
106             serialize_cb => sub { $filter->filter(shift) },
107             }
108              
109             =head1 METHODS
110              
111             =head2 consume
112              
113             Sends a message.
114              
115             =head1 SEE ALSO
116              
117             =over
118              
119             =item L<Message::Passing::AMQP>
120              
121             =item L<Message::Passing::Input::AMQP>
122              
123             =item L<Message::Passing>
124              
125             =item L<AMQP>
126              
127             =item L<http://www.zeromq.org/>
128              
129             =back
130              
131             =head1 AUTHOR, COPYRIGHT AND LICENSE
132              
133             See L<Message::Passing::AMQP>.
134              
135             =cut