File Coverage

blib/lib/Mojo/RabbitMQ/Client/Publisher.pm
Criterion Covered Total %
statement 15 65 23.0
branch 0 10 0.0
condition 1 2 50.0
subroutine 5 9 55.5
pod 1 1 100.0
total 22 87 25.2


line stmt bran cond sub pod time code
1             package Mojo::RabbitMQ::Client::Publisher;
2 5     5   37 use Mojo::Base -base;
  5         12  
  5         46  
3              
4 5     5   921 use Mojo::Promise;
  5         13  
  5         27  
5 5     5   2675 use Mojo::JSON qw(encode_json);
  5         95035  
  5         366  
6 5     5   45 use Scalar::Util 'weaken';
  5         33  
  5         345  
7             require Mojo::RabbitMQ::Client;
8              
9 5   50 5   34 use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0;
  5         12  
  5         4544  
10              
11             has url => undef;
12             has client => undef;
13             has channel => undef;
14             has setup => 0;
15             has defaults => sub { {} };
16              
17             sub publish_p {
18 0     0 1   my $self = shift;
19 0           my $body = shift;
20 0           my $headers = {};
21 0           my %args = ();
22              
23 0 0         if (ref($_[0]) eq 'HASH') {
24 0           $headers = shift;
25             }
26 0 0         if (@_) {
27 0           %args = (@_);
28             }
29              
30 0           my $promise = Mojo::Promise->new()->resolve;
31              
32 0           weaken $self;
33 0 0         unless ($self->client) {
34             $promise = $promise->then(
35             sub {
36 0     0     warn "-- spawn new client\n" if DEBUG;
37 0           my $client_promise = Mojo::Promise->new();
38              
39 0           my $client = Mojo::RabbitMQ::Client->new(url => $self->url);
40 0           $self->client($client);
41              
42             # Catch all client related errors
43 0           $self->client->catch(sub { $client_promise->reject($_[1]) });
  0            
44              
45             # When connection is in Open state, open new channel
46             $self->client->on(
47             open => sub {
48 0           warn "-- client open\n" if DEBUG;
49 0           $client_promise->resolve;
50             }
51 0           );
52              
53             # Start connection
54 0           $client->connect;
55              
56 0           return $client_promise;
57             }
58 0           );
59             }
60              
61             # Create a new channel with auto-assigned id
62 0 0         unless ($self->channel) {
63             $promise = $promise->then(
64             sub {
65 0     0     warn "-- create new channel\n" if DEBUG;
66 0           my $channel_promise = Mojo::Promise->new();
67              
68 0           my $channel = Mojo::RabbitMQ::Client::Channel->new();
69              
70 0           $channel->catch(sub { $channel_promise->reject($_[1]) });
  0            
71              
72             $channel->on(
73             open => sub {
74 0           my ($channel) = @_;
75 0           $self->channel($channel);
76              
77 0           warn "-- channel opened\n" if DEBUG;
78              
79 0           $channel_promise->resolve;
80             }
81 0           );
82 0           $channel->on(close => sub { warn 'Channel closed: ' . $_[1]->method_frame->reply_text; });
  0            
83              
84 0           $self->client->open_channel($channel);
85              
86 0           return $channel_promise;
87             }
88 0           );
89             }
90              
91             $promise = $promise->then(
92             sub {
93 0     0     warn "-- publish message\n" if DEBUG;
94 0           my $query = $self->client->url->query;
95 0           my $exchange_name = $query->param('exchange');
96 0           my $routing_key = $query->param('routing_key');
97 0           my %headers = (content_type => 'text/plain', %$headers);
98              
99 0 0         if (ref($body)) {
100 0           $headers{content_type} = 'application/json';
101 0           $body = encode_json $body;
102             }
103              
104 0           return $self->channel->publish_p(
105             exchange => $exchange_name,
106             routing_key => $routing_key,
107             mandatory => 0,
108             immediate => 0,
109             header => \%headers,
110             %args,
111             body => $body
112             );
113             }
114 0           );
115              
116 0           return $promise;
117             }
118              
119             1;
120              
121             =encoding utf8
122              
123             =head1 NAME
124              
125             Mojo::RabbitMQ::Client::Publisher - simple Mojo::RabbitMQ::Client based publisher
126              
127             =head1 SYNOPSIS
128              
129             use Mojo::RabbitMQ::Client::Publisher;
130             my $publisher = Mojo::RabbitMQ::Client::Publisher->new(
131             url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&routing_key=mojo'
132             );
133              
134             $publisher->publish_p(
135             {encode => { to => 'json'}},
136             routing_key => 'mojo_mq'
137             )->then(sub {
138             say "Message published";
139             })->catch(sub {
140             die "Publishing failed"
141             })->wait;
142              
143             =head1 DESCRIPTION
144              
145              
146              
147             =head1 ATTRIBUTES
148              
149             L has following attributes.
150              
151             =head2 url
152              
153             Sets all connection parameters in one string, according to specification from
154             L.
155              
156             For detailed description please see L.
157              
158             =head1 METHODS
159              
160             L implements only single method.
161              
162             =head2 publish_p
163              
164             $publisher->publish_p('simple plain text body');
165              
166             $publisher->publish_p({ some => 'json' });
167              
168             $publisher->publish_p($body, { header => 'content' }, routing_key => 'mojo', mandatory => 1);
169              
170             Method signature
171              
172             publish_p($body!, \%headers?, *@params)
173              
174             =over 2
175              
176             =item body
177              
178             First argument is mandatory body content of published message.
179             Any reference passed here will be encoded as JSON and accordingly C header
180             will be set to C.
181              
182             =item headers
183              
184             If second argument is a HASHREF it will be merged to message headers.
185              
186             =item params
187              
188             Any other arguments will be considered key/value pairs and passed to the Client's publish
189             method as arguments overriding everything besides body argument.
190              
191             So this:
192              
193             $publisher->publish({ json => 'object' }, { header => 'content' });
194              
195             is similar to this:
196              
197             $publisher->publish({ json => 'object' }, header => { header => 'content' });
198              
199             But beware - headers passed as a HASHREF get merged into the header constructed by the Publisher,
200             but params override values; so if you pass C
as a param like this, it will override the
201             header constructed by the Publisher, and the message will lack the C header, even
202             though you passed a reference as the body argument! With the first example, the C
203             header would be included.
204              
205             =back
206              
207             =head1 SEE ALSO
208              
209             L
210              
211             =head1 COPYRIGHT AND LICENSE
212              
213             Copyright (C) 2015-2017, Sebastian Podjasek and others
214              
215             This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.
216              
217             =cut