File Coverage

blib/lib/Mojo/RabbitMQ/Client.pm
Criterion Covered Total %
statement 90 287 31.3
branch 22 90 24.4
condition 6 33 18.1
subroutine 22 56 39.2
pod 8 10 80.0
total 148 476 31.0


line stmt bran cond sub pod time code
1             package Mojo::RabbitMQ::Client;
2 5     5   169549 use Mojo::Base 'Mojo::EventEmitter';
  5         802118  
  5         46  
3              
4 5     5   8619 use Carp qw(croak confess);
  5         14  
  5         305  
5 5     5   2336 use Mojo::URL;
  5         37751  
  5         40  
6 5     5   2384 use Mojo::Home;
  5         155982  
  5         265  
7 5     5   2743 use Mojo::IOLoop;
  5         556511  
  5         37  
8 5     5   292 use Mojo::Parameters;
  5         13  
  5         53  
9 5     5   159 use Mojo::Promise;
  5         11  
  5         48  
10 5     5   176 use Mojo::Util qw(url_unescape dumper);
  5         13  
  5         272  
11 5     5   39 use List::Util qw(none);
  5         11  
  5         276  
12 5     5   74 use Scalar::Util qw(blessed weaken);
  5         12  
  5         223  
13 5     5   33 use File::Basename 'dirname';
  5         11  
  5         318  
14 5     5   2551 use File::ShareDir qw(dist_file);
  5         119289  
  5         292  
15              
16 5     5   2246 use Net::AMQP;
  5         286856  
  5         189  
17 5     5   46 use Net::AMQP::Common qw(:all);
  5         11  
  5         1114  
18              
19 5     5   3543 use Mojo::RabbitMQ::Client::Channel;
  5         19  
  5         43  
20 5     5   240 use Mojo::RabbitMQ::Client::LocalQueue;
  5         11  
  5         30  
21             require Mojo::RabbitMQ::Client::Consumer;
22             require Mojo::RabbitMQ::Client::Publisher;
23              
24             our $VERSION = "0.2.4";
25              
26 5   50 5   403 use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0;
  5         11  
  5         22730  
27              
28             has is_open => 0;
29             has url => undef;
30             has tls => sub { shift->_uri_handler('tls') };
31             has user => sub { shift->_uri_handler('user') };
32             has pass => sub { shift->_uri_handler('pass') };
33             has host => sub { shift->_uri_handler('host') };
34             has port => sub { shift->_uri_handler('port') };
35             has vhost => sub { shift->_uri_handler('vhost') };
36             has params => sub { shift->_uri_handler('params') // Mojo::Parameters->new };
37             has connect_timeout => sub { $ENV{MOJO_CONNECT_TIMEOUT} // 10 };
38             has heartbeat_timeout => 60;
39             has heartbeat_received => 0; # When did we receive last heartbeat
40             has heartbeat_sent => 0; # When did we sent last heartbeat
41             has ioloop => sub { Mojo::IOLoop->singleton };
42             has max_buffer_size => 16384;
43             has max_channels => 0;
44             has queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new };
45             has channels => sub { {} };
46             has stream_id => undef;
47              
48             sub connect {
49 0     0 1 0 my $self = shift;
50 0         0 $self->{buffer} = '';
51              
52 0         0 my $id;
53 0     0   0 $id = $self->_connect(sub { $self->_connected($id) });
  0         0  
54 0         0 $self->stream_id($id);
55              
56 0         0 return $id;
57             }
58              
59             sub connect_p {
60 0     0 0 0 my $self = shift;
61 0         0 my $promise = Mojo::Promise->new;
62              
63 0         0 my $id;
64              
65 0         0 weaken $self;
66             my $handler = sub {
67 0     0   0 my ($err) = @_;
68 0 0       0 if (defined $err) {
69 0         0 return $promise->reject($err);
70             }
71              
72 0         0 return $promise->resolve($self);
73 0         0 };
74              
75 0     0   0 $id = $self->_connect(sub { $self->_connected($id, $handler) });
  0         0  
76 0         0 $self->stream_id($id);
77              
78 0         0 return $promise;
79             }
80              
81             sub consumer {
82 2     2 1 1420 my ($class, @params) = @_;
83 2 100       21 croak "consumer is a static method" if ref $class;
84              
85 1         11 return Mojo::RabbitMQ::Client::Consumer->new(@params);
86             }
87              
88             sub publisher {
89 2     2 1 1386 my ($class, @params) = @_;
90 2 100       15 croak "publisher is a static method" if ref $class;
91              
92 1         7 return Mojo::RabbitMQ::Client::Publisher->new(@params);
93             }
94              
95             sub param {
96 6     6 1 11989 my $self = shift;
97 6 50       19 return undef unless defined $self->params;
98 6         63 return $self->params->param(@_);
99             }
100              
101             sub add_channel {
102 0     0 1 0 my $self = shift;
103 0         0 my $channel = shift;
104              
105 0         0 my $id = $channel->id;
106 0 0 0     0 if ($id and $self->channels->{$id}) {
107 0         0 return $channel->emit(
108             error => 'Channel with id: ' . $id . ' already defined');
109             }
110              
111 0 0 0     0 if ($self->max_channels > 0
112 0         0 and scalar keys %{$self->channels} >= $self->max_channels)
113             {
114 0         0 return $channel->emit(error => 'Maximum number of channels reached');
115             }
116              
117 0 0       0 if (not $id) {
118 0         0 for my $candidate_id (1 .. (2**16 - 1)) {
119 0 0       0 next if defined $self->channels->{$candidate_id};
120 0         0 $id = $candidate_id;
121 0         0 last;
122             }
123 0 0       0 unless ($id) {
124 0         0 return $channel->emit(error => 'Ran out of channel ids');
125             }
126             }
127              
128 0         0 $self->channels->{$id} = $channel->id($id)->client($self);
129 0         0 weaken $channel->{client};
130              
131 0         0 return $channel;
132             }
133              
134             sub acquire_channel_p {
135 0     0 0 0 my $self = shift;
136              
137 0         0 my $promise = Mojo::Promise->new;
138              
139 0         0 my $channel = Mojo::RabbitMQ::Client::Channel->new();
140 0     0   0 $channel->catch(sub { $promise->reject(@_); undef $promise });
  0         0  
  0         0  
141 0     0   0 $channel->on(close => sub { warn "Channel closed" });
  0         0  
142 0     0   0 $channel->on(open => sub { $promise->resolve(@_); undef $promise });
  0         0  
  0         0  
143              
144 0         0 $self->open_channel($channel);
145              
146 0         0 return $promise;
147             }
148              
149             sub open_channel {
150 0     0 1 0 my $self = shift;
151 0         0 my $channel = shift;
152              
153 0 0       0 return $channel->emit(error => 'Client connection not opened')
154             unless $self->is_open;
155              
156 0         0 $self->add_channel($channel)->open;
157              
158 0         0 return $self;
159             }
160              
161             sub delete_channel {
162 0     0 1 0 my $self = shift;
163 0         0 return delete $self->channels->{shift};
164             }
165              
166             sub close {
167 0     0 1 0 my $self = shift;
168              
169 0         0 weaken $self;
170             $self->_write_expect(
171             'Connection::Close' => {},
172             'Connection::CloseOk' => sub {
173 0     0   0 warn "-- Connection::CloseOk\n" if DEBUG;
174 0         0 $self->emit('close');
175 0         0 $self->_close;
176             },
177             sub {
178 0     0   0 $self->_close;
179             }
180 0         0 );
181             }
182              
183 0     0   0 sub _loop { $_[0]->ioloop }
184              
185             sub _error {
186 0     0   0 my ($self, $id, $err) = @_;
187              
188 0         0 $self->emit(error => $err);
189             }
190              
191             sub _uri_handler {
192 20     20   37 my $self = shift;
193 20         35 my $attr = shift;
194              
195 20 100       49 return undef unless defined $self->url;
196              
197 18 50 33     107 $self->url(Mojo::URL->new($self->url))
198             unless blessed $self->url && $self->url->isa('Mojo::URL');
199              
200             # Set some defaults
201 18         3482 my %defaults = (
202             tls => 0,
203             user => undef,
204             pass => undef,
205             host => 'localhost',
206             port => 5672,
207             vhost => '/',
208             params => undef
209             );
210              
211             # Check secure scheme in url
212 18 100       41 $defaults{tls} = 1
213             if $self->url->scheme
214             =~ /^(amqp|rabbitmq)s$/; # Fallback support for rabbitmq scheme name
215 18 100       159 $defaults{port} = 5671 if $defaults{tls};
216              
217             # Get host & port
218 18 100 66     139 $defaults{host} = $self->url->host
219             if defined $self->url->host && $self->url->host ne '';
220 18 100       315 $defaults{port} = $self->url->port if defined $self->url->port;
221              
222             # Get user & password
223 18         137 my $userinfo = $self->url->userinfo;
224 18 100       121 if (defined $userinfo) {
225 5         24 my ($user, $pass) = split /:/, $userinfo;
226 5         11 $defaults{user} = $user;
227 5         10 $defaults{pass} = $pass;
228             }
229              
230 18         36 my $vhost = url_unescape $self->url->path;
231 18         1770 $vhost =~ s|^/(.+)$|$1|;
232 18 100 66     808 $defaults{vhost} = $vhost if defined $vhost && $vhost ne '';
233              
234             # Query params
235 18         866 my $params = $defaults{params} = $self->url->query;
236              
237             # Handle common aliases to internal names
238 18         393 my %aliases = (
239             cacertfile => 'ca',
240             certfile => 'cert',
241             keyfile => 'key',
242             fail_if_no_peer_cert => 'verify',
243             connection_timeout => 'timeout'
244             );
245             $params->param($aliases{$_}, $params->param($_))
246 18         59 foreach grep { defined $params->param($_) } keys %aliases;
  90         2114  
247              
248             # Some query parameters are translated to attribute values
249 18         755 my %attributes = (
250             heartbeat_timeout => 'heartbeat',
251             connect_timeout => 'timeout',
252             max_channels => 'channel_max'
253             );
254             $self->$_($params->param($attributes{$_}))
255 18         48 foreach grep { defined $params->param($attributes{$_}) } keys %attributes;
  54         682  
256              
257             # Set all
258 18         499 $self->$_($defaults{$_}) foreach keys %defaults;
259              
260 18         702 return $self->$attr;
261             }
262              
263             sub _close {
264 0     0   0 my $self = shift;
265 0         0 $self->_loop->stream($self->stream_id)->close_gracefully;
266             }
267              
268             sub _handle {
269 0     0   0 my ($self, $id, $close) = @_;
270              
271 0         0 $self->emit('disconnect');
272              
273 0         0 $self->_loop->remove($id);
274             }
275              
276             sub _read {
277 0     0   0 my ($self, $id, $chunk) = @_;
278              
279 0         0 warn "<- @{[dumper $chunk]}" if DEBUG;
280 0         0 $self->{buffer} .= $chunk;
281 0         0 $self->_parse_frames;
282              
283 0         0 return;
284             }
285              
286             sub _parse_frames {
287 0     0   0 my $self = shift;
288              
289 0         0 for my $frame (Net::AMQP->parse_raw_frames(\$self->{buffer})) {
290              
291 0 0 0     0 if ($frame->isa('Net::AMQP::Frame::Heartbeat')) {
    0          
    0          
292 0         0 $self->heartbeat_received(time());
293             }
294             elsif ($frame->isa('Net::AMQP::Frame::Method')
295             and $frame->method_frame->isa('Net::AMQP::Protocol::Connection::Close'))
296             {
297 0         0 $self->is_open(0);
298              
299 0         0 $self->_write_frame(Net::AMQP::Protocol::Connection::CloseOk->new());
300             $self->emit(disconnect => "Server side disconnection: "
301 0         0 . $frame->method_frame->{reply_text});
302             }
303             elsif ($frame->channel == 0) {
304 0         0 $self->queue->push($frame);
305             }
306             else {
307 0         0 my $channel = $self->channels->{$frame->channel};
308 0 0       0 if (defined $channel) {
309 0         0 $channel->_push_queue_or_consume($frame);
310             }
311             else {
312 0   0     0 $self->emit(
313             error => "Unknown channel id received: "
314             . ($frame->channel // '(undef)'),
315             $frame
316             );
317             }
318             }
319             }
320             }
321              
322             sub _connect {
323 0     0   0 my ($self, $cb) = @_;
324              
325             # Options
326             # Parse according to (https://www.rabbitmq.com/uri-spec.html)
327 0         0 my $options = {
328             address => $self->host,
329             port => $self->port,
330             timeout => $self->connect_timeout,
331             tls => $self->tls,
332             tls_ca => scalar $self->param('ca'),
333             tls_cert => scalar $self->param('cert'),
334             tls_key => scalar $self->param('key')
335             };
336 0         0 my $verify = $self->param('verify');
337 0 0       0 $options->{tls_verify} = hex $verify if defined $verify;
338              
339             # Connect
340 0         0 weaken $self;
341 0         0 my $id;
342             return $id = $self->_loop->client(
343             $options => sub {
344 0     0   0 my ($loop, $err, $stream) = @_;
345              
346             # Connection error
347 0 0       0 return unless $self;
348 0 0       0 return $self->_error($id, $err) if $err;
349              
350 0         0 $self->emit(connect => $stream);
351              
352             # Connection established
353 0         0 $stream->on(timeout => sub { $self->_error($id, 'Inactivity timeout') });
  0         0  
354 0 0       0 $stream->on(close => sub { $self && $self->_handle($id, 1) });
  0         0  
355 0 0       0 $stream->on(error => sub { $self && $self->_error($id, pop) });
  0         0  
356 0 0       0 $stream->on(read => sub { $self && $self->_read($id, pop) });
  0         0  
357 0         0 $cb->();
358             }
359 0         0 );
360             }
361              
362             sub _connected {
363 0     0   0 my ($self, $id, $cb) = @_;
364              
365             # Inactivity timeout
366 0         0 my $stream = $self->_loop->stream($id)->timeout(0);
367              
368             # Store connection information in transaction
369 0         0 my $handle = $stream->handle;
370              
371             # Detect that xml spec was already loaded
372 0         0 my $loaded = eval { Net::AMQP::Protocol::Connection::StartOk->new; 1 };
  0         0  
  0         0  
373 0 0       0 unless ($loaded) { # Load AMQP specs
374 0         0 my $file = "amqp0-9-1.stripped.extended.xml";
375              
376             # Original spec is in "fixed_amqp0-8.xml"
377 0         0 my $share = dist_file('Mojo-RabbitMQ-Client', $file);
378 0         0 Net::AMQP::Protocol->load_xml_spec($share);
379             }
380              
381 0         0 $self->_write($id => Net::AMQP::Protocol->header);
382              
383 0         0 weaken $self;
384             $self->_expect(
385             'Connection::Start' => sub {
386 0     0   0 my $frame = shift;
387              
388 0         0 my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
389             return $self->emit(error => 'AMQPLAIN is not found in mechanisms')
390 0 0       0 if none { $_ eq 'AMQPLAIN' } @mechanisms;
  0         0  
391              
392 0         0 my @locales = split /\s/, $frame->method_frame->locales;
393             return $self->emit(error => 'en_US is not found in locales')
394 0 0       0 if none { $_ eq 'en_US' } @locales;
  0         0  
395              
396 0         0 $self->{_server_properties} = $frame->method_frame->server_properties;
397              
398 0         0 warn "-- Connection::Start {product: " . $self->{_server_properties}->{product} . ", version: " . $self->{_server_properties}->{version} . "}\n" if DEBUG;
399 0         0 $self->_write_frame(
400             Net::AMQP::Protocol::Connection::StartOk->new(
401             client_properties => {
402             platform => 'Perl',
403             product => __PACKAGE__,
404             information => 'https://github.com/inway/mojo-rabbitmq-client',
405             version => __PACKAGE__->VERSION,
406             },
407             mechanism => 'AMQPLAIN',
408             response => {LOGIN => $self->user, PASSWORD => $self->pass},
409             locale => 'en_US',
410             ),
411             );
412              
413 0         0 $self->_tune($id, $cb);
414             },
415             sub {
416 0     0   0 $self->emit(error => 'Unable to start connection: ' . shift);
417             }
418 0         0 );
419             }
420              
421             sub _tune {
422 0     0   0 my ($self, $id, $cb) = @_;
423              
424 0         0 weaken $self;
425             $self->_expect(
426             'Connection::Tune' => sub {
427 0     0   0 my $frame = shift;
428              
429 0         0 my $method_frame = $frame->method_frame;
430 0         0 $self->max_buffer_size($method_frame->frame_max);
431              
432 0   0     0 my $heartbeat = $self->heartbeat_timeout || $method_frame->heartbeat;
433              
434 0         0 warn "-- Connection::Tune {frame_max: " . $method_frame->frame_max . ", heartbeat: " . $method_frame->heartbeat . "}\n" if DEBUG;
435             # Confirm
436 0         0 $self->_write_frame(
437             Net::AMQP::Protocol::Connection::TuneOk->new(
438             channel_max => $method_frame->channel_max,
439             frame_max => $method_frame->frame_max,
440             heartbeat => $heartbeat,
441             ),
442             );
443              
444             # According to https://www.rabbitmq.com/amqp-0-9-1-errata.html
445             # The client should start sending heartbeats after receiving a Connection.Tune
446             # method, and start monitoring heartbeats after sending Connection.Open.
447             # -and-
448             # Heartbeat frames are sent about every timeout / 2 seconds. After two missed
449             # heartbeats, the peer is considered to be unreachable.
450             $self->{heartbeat_tid} = $self->_loop->recurring(
451             $heartbeat / 2 => sub {
452 0 0       0 return unless time() - $self->heartbeat_sent > $heartbeat / 2;
453 0         0 $self->_write_frame(Net::AMQP::Frame::Heartbeat->new());
454 0         0 $self->heartbeat_sent(time());
455             }
456 0 0       0 ) if $heartbeat;
457              
458             $self->_write_expect(
459             'Connection::Open' =>
460             {virtual_host => $self->vhost, capabilities => '', insist => 1,},
461             'Connection::OpenOk' => sub {
462 0         0 warn "-- Connection::OpenOk\n" if DEBUG;
463              
464 0         0 $self->is_open(1);
465 0         0 $self->emit('open');
466 0 0       0 $cb->() if defined $cb;
467             },
468             sub {
469 0         0 my $err = shift;
470 0         0 $self->emit(error => 'Unable to open connection: ' . $err);
471 0 0       0 $cb->($err) if defined $cb;
472             }
473 0         0 );
474             },
475             sub {
476 0     0   0 $self->emit(error => 'Unable to tune connection: ' . shift);
477             }
478 0         0 );
479             }
480              
481             sub _write_expect {
482 0     0   0 my $self = shift;
483 0         0 my ($method, $args, $exp, $cb, $failure_cb, $channel_id) = @_;
484 0         0 $method = 'Net::AMQP::Protocol::' . $method;
485              
486 0   0     0 $channel_id ||= 0;
487              
488 0         0 my $method_frame = Net::AMQP::Frame::Method->new(
489             method_frame => $method->new(%$args)
490             );
491              
492 0         0 $self->_write_frame(
493             $method_frame,
494             $channel_id
495             );
496              
497 0         0 return $self->_expect($exp, $cb, $failure_cb, $channel_id);
498             }
499              
500             sub _expect {
501 0     0   0 my $self = shift;
502 0         0 my ($exp, $cb, $failure_cb, $channel_id) = @_;
503 0 0       0 my @expected = ref($exp) eq 'ARRAY' ? @$exp : ($exp);
504              
505 0   0     0 $channel_id ||= 0;
506              
507 0         0 my $queue;
508 0 0       0 if (!$channel_id) {
509 0         0 $queue = $self->queue;
510             }
511             else {
512 0         0 my $channel = $self->channels->{$channel_id};
513 0 0       0 if (defined $channel) {
514 0         0 $queue = $channel->queue;
515             }
516             else {
517 0   0     0 $failure_cb->(
518             "Unknown channel id received: " . ($channel_id // '(undef)'));
519             }
520             }
521              
522 0 0       0 return unless $queue;
523              
524             $queue->get(
525             sub {
526 0     0   0 my $frame = shift;
527              
528 0 0       0 return $failure_cb->("Received data is not method frame")
529             if not $frame->isa("Net::AMQP::Frame::Method");
530              
531 0         0 my $method_frame = $frame->method_frame;
532 0         0 for my $exp (@expected) {
533 0 0       0 return $cb->($frame)
534             if $method_frame->isa("Net::AMQP::Protocol::" . $exp);
535             }
536              
537 0         0 $failure_cb->("Method is not "
538             . join(', ', @expected)
539             . ". It's "
540             . ref($method_frame));
541             }
542 0         0 );
543             }
544              
545             sub _write_frame {
546 0     0   0 my $self = shift;
547 0         0 my $id = $self->stream_id;
548 0         0 my ($out, $channel, $cb) = @_;
549              
550 0 0       0 if ($out->isa('Net::AMQP::Protocol::Base')) {
551 0         0 $out = $out->frame_wrap;
552             }
553 0   0     0 $out->channel($channel // 0);
554              
555 0         0 return $self->_write($id, $out->to_raw_frame, $cb);
556             }
557              
558             sub _write {
559 0     0   0 my $self = shift @_;
560 0         0 my $id = shift @_;
561 0         0 my $frame = shift @_;
562 0         0 my $cb = shift @_;
563              
564 0         0 warn "-> @{[dumper $frame]}" if DEBUG;
565              
566 0         0 utf8::downgrade($frame);
567 0 0       0 $self->_loop->stream($id)->write($frame => $cb)
568             if defined $self->_loop->stream($id);
569             }
570              
571             sub DESTROY {
572 20     20   52194 my $self = shift;
573 20 50       67 my $ioloop = $self->ioloop or return;
574 20         137 my $heartbeat_tid = $self->{heartbeat_tid};
575              
576 20 50       302 $ioloop->remove($heartbeat_tid) if $heartbeat_tid;
577             }
578              
579             1;
580              
581             =encoding utf8
582              
583             =head1 NAME
584              
585             Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client
586              
587             =head1 SYNOPSIS
588              
589             use Mojo::RabbitMQ::Client;
590              
591             # Supply URL according to (https://www.rabbitmq.com/uri-spec.html)
592             my $client = Mojo::RabbitMQ::Client->new(
593             url => 'amqp://guest:guest@127.0.0.1:5672/');
594              
595             # Catch all client related errors
596             $client->catch(sub { warn "Some error caught in client"; });
597              
598             # When connection is in Open state, open new channel
599             $client->on(
600             open => sub {
601             my ($client) = @_;
602              
603             # Create a new channel with auto-assigned id
604             my $channel = Mojo::RabbitMQ::Client::Channel->new();
605              
606             $channel->catch(sub { warn "Error on channel received"; });
607              
608             $channel->on(
609             open => sub {
610             my ($channel) = @_;
611             $channel->qos(prefetch_count => 1)->deliver;
612              
613             # Publish some example message to test_queue
614             my $publish = $channel->publish(
615             exchange => 'test',
616             routing_key => 'test_queue',
617             body => 'Test message',
618             mandatory => 0,
619             immediate => 0,
620             header => {}
621             );
622             # Deliver this message to server
623             $publish->deliver;
624              
625             # Start consuming messages from test_queue
626             my $consumer = $channel->consume(queue => 'test_queue');
627             $consumer->on(message => sub { say "Got a message" });
628             $consumer->deliver;
629             }
630             );
631             $channel->on(close => sub { $log->error('Channel closed') });
632              
633             $client->open_channel($channel);
634             }
635             );
636              
637             # Start connection
638             $client->connect();
639              
640             # Start Mojo::IOLoop if not running already
641             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
642              
643             =head2 CONSUMER
644              
645             use Mojo::RabbitMQ::Client;
646             my $consumer = Mojo::RabbitMQ::Client->consumer(
647             url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo',
648             defaults => {
649             qos => {prefetch_count => 1},
650             queue => {durable => 1},
651             consumer => {no_ack => 0},
652             }
653             );
654              
655             $consumer->catch(sub { die "Some error caught in Consumer" } );
656             $consumer->on('success' => sub { say "Consumer ready" });
657             $consumer->on(
658             'message' => sub {
659             my ($consumer, $message) = @_;
660              
661             $consumer->channel->ack($message)->deliver;
662             }
663             );
664             $consumer->start();
665              
666             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
667              
668             =head2 PUBLISHER
669              
670             use Mojo::RabbitMQ::Client;
671             my $publisher = Mojo::RabbitMQ::Client->publisher(
672             url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&routing_key=mojo'
673             );
674              
675             $publisher->publish('plain text');
676              
677             $publisher->publish(
678             {encode => { to => 'json'}},
679             routing_key => 'mojo_mq'
680             )->then(sub {
681             say "Message published";
682             })->catch(sub {
683             die "Publishing failed"
684             })->wait;
685              
686             =head1 DESCRIPTION
687              
688             L is a rewrite of L to work on top of L.
689              
690             =head1 EVENTS
691              
692             L inherits all events from L and can emit the
693             following new ones.
694              
695             =head2 connect
696              
697             $client->on(connect => sub {
698             my ($client, $stream) = @_;
699             ...
700             });
701              
702             Emitted when TCP/IP connection with RabbitMQ server is established.
703              
704             =head2 open
705              
706             $client->on(open => sub {
707             my ($client) = @_;
708             ...
709             });
710              
711             Emitted AMQP protocol Connection.Open-Ok method is received.
712              
713             =head2 close
714              
715             $client->on(close => sub {
716             my ($client) = @_;
717             ...
718             });
719              
720             Emitted on reception of Connection.Close-Ok method.
721              
722             =head2 disconnect
723              
724             $client->on(close => sub {
725             my ($client) = @_;
726             ...
727             });
728              
729             Emitted when TCP/IP connection gets disconnected.
730              
731             =head1 ATTRIBUTES
732              
733             L has following attributes.
734              
735             =head2 tls
736              
737             my $tls = $client->tls;
738             $client = $client->tls(1)
739              
740             Force secure connection. Default is disabled (C<0>).
741              
742             =head2 user
743              
744             my $user = $client->user;
745             $client = $client->user('guest')
746              
747             Sets username for authorization, by default it's not defined.
748              
749             =head2 pass
750              
751             my $pass = $client->pass;
752             $client = $client->pass('secret')
753              
754             Sets user password for authorization, by default it's not defined.
755              
756             =head2 host
757              
758             my $host = $client->host;
759             $client = $client->host('localhost')
760              
761             Hostname or IP address of RabbitMQ server. Defaults to C.
762              
763             =head2 port
764              
765             my $port = $client->port;
766             $client = $client->port(1234)
767              
768             Port on which RabbitMQ server listens for new connections.
769             Defaults to C<5672>, which is standard RabbitMQ server listen port.
770              
771             =head2 vhost
772              
773             my $vhost = $client->vhost;
774             $client = $client->vhost('/')
775              
776             RabbitMQ virtual server to user. Default is C.
777              
778             =head2 params
779              
780             my $params = $client->params;
781             $client = $client->params(Mojo::Parameters->new('verify=1'))
782              
783             Sets additional parameters for connection. Default is not defined.
784              
785             For list of supported parameters see L.
786              
787             =head2 url
788              
789             my $url = $client->url;
790             $client = $client->url('amqp://...');
791              
792             Sets all connection parameters in one string, according to specification from
793             L.
794              
795             amqp_URI = "amqp[s]://" amqp_authority [ "/" vhost ] [ "?" query ]
796              
797             amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ]
798              
799             amqp_userinfo = username [ ":" password ]
800              
801             username = *( unreserved / pct-encoded / sub-delims )
802              
803             password = *( unreserved / pct-encoded / sub-delims )
804              
805             vhost = segment
806              
807             =head2 heartbeat_timeout
808              
809             my $timeout = $client->heartbeat_timeout;
810             $client = $client->heartbeat_timeout(180);
811              
812             Heartbeats are use to monitor peer reachability in AMQP.
813             Default value is C<60> seconds, if set to C<0> no heartbeats will be sent.
814              
815             =head2 connect_timeout
816              
817             my $timeout = $client->connect_timeout;
818             $client = $client->connect_timeout(5);
819              
820             Connection timeout used by L.
821             Defaults to environment variable C or C<10> seconds
822             if nothing else is set.
823              
824             =head2 max_channels
825              
826             my $max_channels = $client->max_channels;
827             $client = $client->max_channels(10);
828              
829             Maximum number of channels allowed to be active. Defaults to C<0> which
830             means no implicit limit.
831              
832             When you try to call C over limit an C will be
833             emitted on channel saying that: I.
834              
835             =head1 STATIC METHODS
836              
837             =head2 consumer
838              
839             my $client = Mojo::RabbitMQ::Client->consumer(...)
840              
841             Shortcut for creating L.
842              
843             =head2 publisher
844              
845             my $client = Mojo::RabbitMQ::Client->publisher(...)
846              
847             Shortcut for creating L.
848              
849             =head1 METHODS
850              
851             L inherits all methods from L and implements
852             the following new ones.
853              
854             =head2 connect
855              
856             $client->connect();
857              
858             Tries to connect to RabbitMQ server and negotiate AMQP protocol.
859              
860             =head2 close
861              
862             $client->close();
863              
864             =head2 param
865              
866             my $param = $client->param('name');
867             $client = $client->param(name => 'value');
868              
869             =head2 add_channel
870              
871             my $channel = Mojo::RabbitMQ::Client::Channel->new();
872             ...
873             $channel = $client->add_channel($channel);
874             $channel->open;
875              
876             =head2 open_channel
877              
878             my $channel = Mojo::RabbitMQ::Client::Channel->new();
879             ...
880             $client->open_channel($channel);
881              
882             =head2 delete_channel
883              
884             my $removed = $client->delete_channel($channel->id);
885              
886             =head1 SUPPORTED QUERY PARAMETERS
887              
888             There's no formal specification, nevertheless a list of common parameters
889             recognized by officially supported RabbitMQ clients is maintained here:
890             L.
891              
892             Some shortcuts are also supported, you'll find them in parenthesis.
893              
894             Aliases are less significant, so when both are specified only primary
895             value will be used.
896              
897             =head2 cacertfile (I)
898              
899             Path to Certificate Authority file for TLS.
900              
901             =head2 certfile (I)
902              
903             Path to the client certificate file for TLS.
904              
905             =head2 keyfile (I)
906              
907             Path to the client certificate private key file for TLS.
908              
909             =head2 fail_if_no_peer_cert (I)
910              
911             TLS verification mode, defaults to 0x01 on the client-side if a certificate
912             authority file has been provided, or 0x00 otherwise.
913              
914             =head2 auth_mechanism
915              
916             Currently only AMQPLAIN is supported, B.
917              
918             =head2 heartbeat
919              
920             Sets requested heartbeat timeout, just like C attribute.
921              
922             =head2 connection_timeout (I)
923              
924             Sets connection timeout - see L attribute.
925              
926             =head2 channel_max
927              
928             Sets maximum number of channels - see L attribute.
929              
930             =head1 SEE ALSO
931              
932             L, L, L
933              
934             =head1 COPYRIGHT AND LICENSE
935              
936             Copyright (C) 2015-2017, Sebastian Podjasek and others
937              
938             Based on L - Copyright (C) 2010 Masahito Ikuta, maintained by C<< bobtfish@bobtfish.net >>
939              
940             This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.
941              
942             =cut