File Coverage

blib/lib/Message/Passing/Output/AMQP.pm
Criterion Covered Total %
statement 9 21 42.8
branch 0 8 0.0
condition 0 3 0.0
subroutine 3 4 75.0
pod 1 1 100.0
total 13 37 35.1


line stmt bran cond sub pod time code
1             package Message::Passing::Output::AMQP;
2 1     1   1215 use Moo;
  1         2  
  1         7  
3 1     1   400 use Types::Standard qw( Str CodeRef );
  1         2  
  1         8  
4 1     1   690 use namespace::autoclean;
  1         3  
  1         8  
5              
6             with qw/
7             Message::Passing::AMQP::Role::DeclaresExchange
8             Message::Passing::Role::Output
9             /;
10              
11             has routing_key => (
12             isa => Str,
13             is => 'ro',
14             default => '',
15             );
16              
17             has header_cb => (
18             isa => CodeRef,
19             is => 'ro',
20             );
21              
22             has serialize_cb => (
23             isa => CodeRef,
24             is => 'ro',
25             );
26              
27             sub consume {
28 0     0 1   my $self = shift;
29 0           my $data = shift;
30 0 0 0       if (ref $data && ! defined $self->serialize_cb) {
31 0           warn("Passed non-serialized data - is a perl reference. Dropping.\n");
32 0           return;
33             }
34 0 0         unless ($self->_exchange) {
35 0           warn("No exchange yet, dropping message");
36 0           return;
37             }
38 0           my $header;
39 0 0         $header = $self->header_cb->($data)
40             if defined $self->header_cb;
41              
42 0 0         $data = $self->serialize_cb->($data)
43             if defined $self->serialize_cb;
44              
45 0           $self->_channel->publish(
46             body => $data,
47             header => $header,
48             exchange => $self->exchange_name,
49             routing_key => $self->routing_key,
50             );
51             }
52              
53             1;
54              
55             =head1 NAME
56              
57             Message::Passing::Output::AMQP - output messages to AMQP.
58              
59             =head1 SYNOPSIS
60              
61             message-pass --input STDIN --output AMQP --output_options '{"exchange_name":"test","hostname":"127.0.0.1","username":"guest","password":"guest"}'
62              
63             =head1 DESCRIPTION
64              
65             A L<Message::Passing> L<AnyEvent::RabbitMQ> output class.
66              
67             Can be used as part of a chain of classes with the L<message-pass> utility, or directly as
68             a logger in normal perl applications.
69              
70             =head1 ATTRIBUTES
71              
72             =head2 routing_key
73              
74             The routing key for all messages, defaults to ''.
75              
76             =head2 header_cb
77              
78             Optional callback function which gets passed the message before it is
79             serialized using L</serialize_cb>.
80             Should return a hashref which gets passed to publish( header => ).
81              
82             NOTE: if you want to set the message headers (note the s) you have to pass them inside headers, e.g.:
83              
84             {
85             content_type => 'application/json',
86             headers => {
87             key => 'value',
88             }
89             }
90              
91             =head2 serialize_cb
92              
93             Optional callback function which gets passed the message and should return a
94             scalar. This is useful when passing structured messages e.g. hashrefs or
95             objects where some attributes should be accessible for the L</header_cb> function.
96             If the serialization happens before using a L<Message::Passing::Role::Filter>
97             it would require to deserialize it again in header_cb.
98             To use a Message::Passing filter you can instantiate it and pass it's filter
99             function to serialize_cb:
100              
101             my $filter = Message::Passing::Filter::Encoder::JSON->new(output_to => undef);
102              
103             ...
104              
105             {
106             serialize_cb => sub { $filter->filter(shift) },
107             }
108              
109             =head1 METHODS
110              
111             =head2 consume
112              
113             Sends a message.
114              
115             =head1 SEE ALSO
116              
117             =over
118              
119             =item L<Message::Passing::AMQP>
120              
121             =item L<Message::Passing::Input::AMQP>
122              
123             =item L<Message::Passing>
124              
125             =item L<AMQP>
126              
127             =item L<http://www.zeromq.org/>
128              
129             =back
130              
131             =head1 AUTHOR, COPYRIGHT AND LICENSE
132              
133             See L<Message::Passing::AMQP>.
134              
135             =cut