File Coverage

blib/lib/Net/Async/AMQP/ConnectionManager.pm
Criterion Covered Total %
statement 28 30 93.3
branch n/a
condition n/a
subroutine 10 10 100.0
pod n/a
total 38 40 95.0


line stmt bran cond sub pod time code
1             package Net::Async::AMQP::ConnectionManager;
2             $Net::Async::AMQP::ConnectionManager::VERSION = '2.000';
3 4     4   270065 use strict;
  4         7  
  4         95  
4 4     4   16 use warnings;
  4         4  
  4         97  
5              
6 4     4   1504 use parent qw(IO::Async::Notifier);
  4         909  
  4         15  
7              
8             =head1 NAME
9              
10             Net::Async::AMQP::ConnectionManager - handle MQ connections
11              
12             =head1 VERSION
13              
14             version 2.000
15              
16             =head1 SYNOPSIS
17              
18             use IO::Async::Loop;
19             use Net::Async::AMQP;
20             my $loop = IO::Async::Loop->new;
21             $loop->add(
22             my $cm = Net::Async::AMQP::ConnectionManager->new
23             );
24             $cm->add(
25             host => 'localhost',
26             user => 'guest',
27             pass => 'guest',
28             vhost => 'vhost',
29             );
30             $cm->request_channel->then(sub {
31             my $ch = shift;
32             Future->needs_all(
33             $ch->declare_exchange(
34             'exchange_name'
35             ),
36             $ch->declare_queue(
37             'queue_name'
38             ),
39             )->transform(done => sub { $ch })
40             })->then(sub {
41             my $ch = shift;
42             $ch->bind_queue(
43             'exchange_name',
44             'queue_name',
45             '*'
46             )
47             })->get;
48              
49             =cut
50              
51 4     4   12114 use Future;
  4         5  
  4         77  
52 4     4   14 use Future::Utils qw(call try_repeat fmap_void);
  4         4  
  4         205  
53              
54 4     4   14 use Time::HiRes ();
  4         4  
  4         43  
55 4     4   13 use Scalar::Util ();
  4         6  
  4         70  
56 4     4   1638 use List::UtilsBy ();
  4         4327  
  4         83  
57 4     4   1498 use Variable::Disposition qw(retain_future);
  4         1357  
  4         199  
58              
59 4     4   1628 use Net::Async::AMQP;
  0            
  0            
60             use Net::Async::AMQP::ConnectionManager::Channel;
61             use Net::Async::AMQP::ConnectionManager::Connection;
62              
63             =head1 DESCRIPTION
64              
65             =head2 Channel management
66              
67             Each connection has N total available channels, recorded in a hash. The total number
68             of channels per connection is negotiated via the initial AMQP Tune/TuneOk sequence on
69             connection.
70              
71             We also maintain lists:
72              
73             =over 4
74              
75             =item * Unassigned channel - these are channels which were in use and have now been released.
76              
77             =item * Closed channel - any time a channel is closed, the ID is pushed onto this list so we can reopen it later without needing to scan the hash, contains arrayrefs of [$mq_conn, $id]
78              
79             =back
80              
81             Highest-assigned ID is also recorded per connection.
82              
83             if(have unassigned) {
84             return shift unassigned
85             } elsif(have closed) {
86             my $closed = shift closed;
87             return $closed->{mq}->open_channel($closed->{id})
88             } elsif(my $next_id = $mq->next_id) {
89             return $mq->open_channel($next_id)
90             } else {
91            
92             }
93              
94             Calling methods on the channel proxy will establish
95             a cycle for the duration of the pending request.
96             This cycle will not be resolved until after all
97             the callbacks have completed for a given request.
98              
99             The channel object does not expose any methods that allow
100             altering QoS or other channel state settings. These must be
101             requested on channel assignment. This does not necessarily
102             mean that any QoS change will require allocation of a new
103             channel.
104              
105             Bypassing the proxy object to change QoS flags is not recommended.
106              
107             =head2 Connection pool
108              
109             Connections are established on demand.
110              
111             =head1 METHODS
112              
113             =cut
114              
115             sub configure {
116             my ($self, %args) = @_;
117             for(qw(channel_retry_count connect_timeout)) {
118             $self->{$_} = delete $args{$_} if exists $args{$_};
119             }
120             $self->SUPER::configure(%args);
121             }
122              
123             =head2 request_channel
124              
125             Attempts to assign a channel with the given QoS settings.
126              
127             Available QoS settings are:
128              
129             =over 4
130              
131             =item * prefetch_count - number of messages that can be delivered at a time
132              
133             =item * prefetch_size - total size of messages allowed before acknowledging
134              
135             =item * confirm_mode - explicit publish ack
136              
137             =back
138              
139             Confirm mode isn't really QoS but it fits in with the others since it modifies
140             the channel state (and once enabled, cannot be disabled without closing and
141             reopening the channel).
142              
143             Will resolve to a L instance on success.
144              
145             =cut
146              
147             sub request_channel {
148             my $self = shift;
149             my %args = @_;
150              
151             die "We are shutting down" if $self->{shutdown_future};
152              
153             # Assign channel with matching QoS if available
154             my $k = $self->key_for_args(\%args);
155             if(exists $self->{channel_by_key}{$k} && @{$self->{channel_by_key}{$k}}) {
156             my $ch = shift @{$self->{channel_by_key}{$k}};
157             return $self->request_channel(%args) unless $ch->loop && !$ch->is_closed && !$ch->{closing};
158             $self->debug_printf("Assigning %d from by_key cache", $ch->id);
159             return Future->wrap(
160             Net::Async::AMQP::ConnectionManager::Channel->new(
161             channel => $ch,
162             manager => $self,
163             )
164             )
165             }
166              
167             # If we get here, we don't have an appropriate channel already available,
168             # so whichever means we use to obtain a channel will need to set QoS afterwards
169             my $f;
170              
171             if($self->can_reopen_channels && exists $self->{closed_channel} && @{$self->{closed_channel}}) {
172             # If we have an ID for a closed channel then reuse that first.
173             my ($mq, $id) = @{shift @{$self->{closed_channel}}};
174             $self->debug_printf("Reopening closed channel %d", $id);
175             $f = $mq->open_channel(
176             channel => $id
177             );
178             } else {
179             # Try to get a channel - limit this to 3 attempts
180             my $count = 0;
181             $f = try_repeat {
182             $self->request_connection->then(sub {
183             my $mq = shift;
184             call {
185             # If we have any spare IDs on this connection, attempt to open
186             # a channel here
187             if(my $id = $mq->next_channel) {
188             return $mq->open_channel(
189             channel => $id
190             )
191             }
192              
193             # No spare IDs, so record this to avoid hitting this MQ connection
194             # on the next request as well
195             $self->mark_connection_full($mq->amqp);
196              
197             # Just in case...
198             delete $self->{pending_connection};
199              
200             # We can safely fail at this point, since we're in a loop and the
201             # next iteration should get a new MQ connection to try with
202             Future->fail(channel => 'no spare channels on connection');
203             }
204             });
205             } until => sub {
206             my $f = shift;
207             return 1 if $f->is_done;
208             return 0 unless defined(my $retry = $self->channel_retry_count);
209             return 1 if ++$count > $retry
210             }
211             }
212              
213             # Apply our QoS on the channel if we ever get one
214             return $f->then(sub {
215             my $ch = shift;
216             die "no channel provided?" unless $ch;
217             call {
218             $ch->bus->subscribe_to_event(
219             close => $self->curry::weak::on_channel_close($ch),
220             );
221             $self->apply_qos($ch => %args)
222             }
223             })->set_label(
224             'Channel QoS'
225             )->transform(
226             done => sub {
227             my $ch = shift;
228             $self->{channel_args}{$ch->id} = \%args;
229             $self->debug_printf("Assigning newly-created channel %d", $ch->id);
230             Net::Async::AMQP::ConnectionManager::Channel->new(
231             channel => $ch,
232             manager => $self,
233             )
234             }
235             );
236             }
237              
238             =head2 can_reopen_channels
239              
240             A constant which indicates whether we can reopen channels. The AMQP0.9.1
241             spec doesn't seem to explicitly allow this, but it works with RabbitMQ 3.4.3
242             (and probably older versions) so it's enabled by default.
243              
244             =cut
245              
246             sub can_reopen_channels { 1 }
247              
248             =head2 channel_retry_count
249              
250             Returns the channel retry count. The default is 10, call L
251             with undef to retry indefinitely, 0 to avoid retrying at all:
252              
253             # Keep trying until it works
254             $mq->configure(channel_retry_count => undef);
255             # Don't retry at all
256             $mq->configure(channel_retry_count => 0);
257              
258             =cut
259              
260             sub channel_retry_count {
261             my $self = shift;
262             # undef is a valid entry here
263             if(!exists $self->{channel_retry_count}) {
264             $self->{channel_retry_count} = 10;
265             }
266             $self->{channel_retry_count}
267             }
268              
269             =head2 connect_timeout
270              
271             Returns the current connection timeout. undef/zero means "no timeout".
272              
273             =cut
274              
275             sub connect_timeout { shift->{connect_timeout} }
276              
277             =head2 apply_qos
278              
279             Set QoS on the given channel.
280              
281             Expects the L object as the first
282             parameter, followed by the key/value pairs corresponding to
283             the desired QoS settings:
284              
285             =over 4
286              
287             =item * prefetch_count - number of messages that can be delivered before ACK
288             is required
289              
290             =back
291              
292             Returns a L which will resolve to the original
293             L instance.
294              
295             =cut
296              
297             sub apply_qos {
298             my ($self, $ch, %args) = @_;
299             (fmap_void {
300             my $k = shift;
301             my $v = $args{$k};
302             my $method = "qos_$k";
303             my $code = $self->can($method) or die "Unknown QoS setting $k (value $v)";
304             $code->($self, $ch, $k => $v);
305             } foreach => [
306             sort keys %args
307             ])->transform(
308             done => sub { $ch }
309             )->set_label(
310             'Apply QoS settings'
311             );
312             }
313              
314             sub qos_prefetch_size {
315             my ($self, $ch, $k, $v) = @_;
316             return $ch->qos(
317             $k => $v
318             )->set_label("Apply $k QoS");
319             }
320              
321             sub qos_prefetch_count {
322             my ($self, $ch, $k, $v) = @_;
323             return $ch->qos(
324             $k => $v
325             )->set_label("Apply $k QoS");
326             }
327              
328             sub qos_confirm_mode {
329             my ($self, $ch) = @_;
330             return $ch->confirm_mode(
331             )->set_label("Apply confirm_mode QoS");
332             }
333              
334             =head2 request_connection
335              
336             Attempts to connect to one of the known AMQP servers.
337              
338             =cut
339              
340             sub request_connection {
341             my ($self) = @_;
342             die "We are shutting down" if $self->{shutdown_future};
343             if(my $conn = $self->{pending_connection}) {
344             $self->debug_printf("Requested connection and we have one pending, returning that");
345             return $conn
346             }
347              
348             if(exists $self->{available_connections} && @{$self->{available_connections}}) {
349             $self->debug_printf("Assigning existing connection");
350             return Future->wrap(
351             Net::Async::AMQP::ConnectionManager::Connection->new(
352             amqp => $self->{available_connections}[0],
353             manager => $self,
354             )
355             )
356             }
357             die "No connection details available" unless $self->{amqp_host};
358              
359             $self->debug_printf("New connection is required");
360             my $timeout = $self->connect_timeout;
361             retain_future(
362             Future->wait_any(
363             $self->{pending_connection} = $self->connect(
364             %{$self->next_host}
365             )->on_ready(sub {
366             delete $self->{pending_connection};
367             })->transform(
368             done => sub {
369             my $mq = shift;
370             $mq->bus->subscribe_to_event(
371             close => sub {
372             # Drop this connection on close.
373             my ($ev) = @_;
374             eval { $ev->unsubscribe; };
375             my $ref = Scalar::Util::refaddr($mq);
376             List::UtilsBy::extract_by {
377             Scalar::Util::refaddr($_) eq $ref
378             } @{$self->{available_connections}};
379              
380             # Also remove from the full list...
381             List::UtilsBy::extract_by {
382             Scalar::Util::refaddr($_) eq $ref
383             } @{$self->{full_connections}};
384              
385             # ... and any channels we had stashed
386             List::UtilsBy::extract_by {
387             Scalar::Util::refaddr($_->[0]) eq $ref
388             } @{$self->{closed_channel}};
389              
390             # ... even the active ones
391             for my $k (sort keys %{$self->{channel_by_key}}) {
392             List::UtilsBy::extract_by {
393             Scalar::Util::refaddr($_->amqp) eq $ref
394             } @{$self->{channel_by_key}{$k}};
395             }
396             }
397             );
398             my $conn = Net::Async::AMQP::ConnectionManager::Connection->new(
399             amqp => $mq,
400             manager => $self,
401             );
402             push @{$self->{available_connections}}, $mq;
403             $conn
404             }
405             )->set_label(
406             'Connect to MQ server'
407             ),
408             ( # Cancel the attempt if the timeout expires
409             $timeout ?
410             $self->loop->timeout_future(
411             after => $self->connect_timeout,
412             )->on_fail(sub {
413             $self->{pending_connection}->cancel if $self->{pending_connection} && !$self->{pending_connection}->is_ready;
414             })
415             # ... if we had a timeout
416             : ()
417             )
418             )->on_ready(sub {
419             delete $self->{pending_connection}
420             })
421             )
422             }
423              
424             =head2 next_host
425              
426             Returns the next AMQP host.
427              
428             =cut
429              
430             sub next_host {
431             my $self = shift;
432             $self->{amqp_host}[rand @{$self->{amqp_host}}]
433             }
434              
435             =head2 connect
436              
437             Attempts a connection to an AMQP host.
438              
439             =cut
440              
441             sub connect {
442             my ($self, %args) = @_;
443             die "We are shutting down" if $self->{shutdown_future};
444             $self->add_child(
445             my $amqp = Net::Async::AMQP->new
446             );
447             $amqp->configure(heartbeat_interval => delete $args{heartbeat}) if exists $args{heartbeat};
448             $amqp->configure(max_channels => delete $args{max_channels}) if exists $args{max_channels};
449             $args{port} ||= 5672;
450             $amqp->connect(
451             %args
452             )
453             }
454              
455             =head2 mark_connection_full
456              
457             Indicate that this connection has already allocated all available
458             channels.
459              
460             =cut
461              
462             sub mark_connection_full {
463             my ($self, $mq) = @_;
464             # Drop this from the available connection list
465             push @{$self->{full_connections}}, $self->extract_conn(
466             $mq,
467             $self->{available_connections}
468             );
469             $self
470             }
471              
472             sub extract_conn {
473             my ($self, $conn, $stash) = @_;
474             my @rslt = List::UtilsBy::extract_by {
475             Scalar::Util::refaddr($_) == Scalar::Util::refaddr($conn)
476             } @$stash;
477             @rslt
478             }
479              
480             =head2 key_for_args
481              
482             Returns a key that represents the given arguments.
483              
484             =cut
485              
486             sub key_for_args {
487             my ($self, $args) = @_;
488             join ',', map { "$_=$args->{$_}" } sort keys %$args;
489             }
490              
491             =head2 on_channel_close
492              
493             Called when one of our channels has been closed.
494              
495             =cut
496              
497             sub on_channel_close {
498             my ($self, $ch, $ev, %args) = @_;
499             $self->debug_printf("channel closure: %s", join ' ', @_);
500             # Channel closure only happens once per channel
501             eval { $ev->unsubscribe; };
502              
503             $self->debug_printf("Adding closed channel %d back to the available list", $ch->id);
504             my $amqp = $ch->amqp or die "This channel (" . $ch->id . ") has no AMQP connection";
505              
506             # We don't want to do anything with this channel if the parent connection is closed
507             return unless $self->connection_valid($amqp);
508              
509             push @{$self->{closed_channel}}, [ $amqp, $ch->id ];
510              
511             # If this connection was in the full list, add it back to the available
512             # list, since it now has spare channels
513             push @{$self->{available_connections}}, $self->extract_conn(
514             $amqp,
515             $self->{full_connections}
516             );
517             }
518              
519             =head2 release_channel
520              
521             Releases the given channel back to our channel pool.
522              
523             =cut
524              
525             sub release_channel {
526             my ($self, $ch) = @_;
527             return $self unless $ch && $ch->amqp && $self->connection_valid($ch->amqp);
528              
529             $self->debug_printf("Releasing channel %d", $ch->id);
530             my $args = $self->{channel_args}{$ch->id};
531             my $k = $self->key_for_args($args);
532             push @{$self->{channel_by_key}{$k}}, $ch;
533             $self
534             }
535              
536             =head2 connection_valid
537              
538             Returns true if this connection is one we know about, false if it's
539             closed or otherwise not usable.
540              
541             =cut
542              
543             sub connection_valid {
544             my ($self, $amqp) = @_;
545             my $ref = Scalar::Util::refaddr($amqp);
546             return (
547             grep {
548             Scalar::Util::refaddr($_) eq $ref
549             } @{$self->{available_connections}}, @{$self->{full_connections}}
550             ) ? 1 : 0;
551             }
552              
553             =head2 add
554              
555             Adds connection details for an AMQP server to the pool.
556              
557             =cut
558              
559             sub add {
560             my ($self, %args) = @_;
561             push @{$self->{amqp_host}}, \%args;
562             }
563              
564             =head2 exch
565              
566             =cut
567              
568             sub exch {
569             my ($self, $exch) = @_;
570             return $self->{exchange}{$exch} if exists $self->{exchange}{$exch};
571             $self->{exchange}{$exch} = $self->request_channel->then(sub {
572             my $ch = shift;
573             $ch->declare_exchange(
574             $exch
575             )
576             });
577             }
578              
579             sub queue {
580             my ($self, $q) = @_;
581             return $self->{queue}{$q} if exists $self->{queue}{$q};
582             $self->{queue}{$q} = $self->request_channel->then(sub {
583             my $ch = shift;
584             $ch->declare_queue(
585             $q
586             )
587             });
588             }
589              
590             =head2 release_connection
591              
592             Releases a connection.
593              
594             Doesn't really do anything.
595              
596             =cut
597              
598             sub release_connection {
599             my ($self, $mq) = @_;
600             $self->debug_printf("Releasing connection %s", $mq);
601             }
602              
603             sub connection_count {
604             my ($self) = @_;
605             @{$self->{available_connections}} + @{$self->{full_connections}}
606             }
607              
608             sub _add_to_loop {
609             my ($self, $loop) = @_;
610             $self->{available_connections} ||= [];
611             $self->{full_connections} ||= [];
612             }
613              
614             sub shutdown {
615             my $self = shift;
616             $self->debug_printf("Shutdown started");
617             die "Shutdown already in progress?" if $self->{shutdown_future};
618             my $start = [Time::HiRes::gettimeofday];
619             $self->{shutdown_future} = Future->wait_all(
620             map $_->close, @{$self->{available_connections}}
621             )->on_ready(sub {
622             delete $self->{shutdown_future};
623             })->on_done(sub {
624             $self->debug_printf("All connections closed - elapsed %.3fs", Time::HiRes::tv_interval($start, [Time::HiRes::gettimeofday]));
625             });
626             }
627              
628             1;
629              
630             __END__