File Coverage

blib/lib/Net/Async/AMQP.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Net::Async::AMQP;
2             # ABSTRACT: IO::Async support for the AMQP protocol
3 10     10   151061 use strict;
  10         13  
  10         265  
4 10     10   33 use warnings;
  10         11  
  10         230  
5              
6 10     10   2493 use parent qw(IO::Async::Notifier);
  10         1441  
  10         41  
7              
8             our $VERSION = '2.000';
9              
10             =head1 NAME
11              
12             Net::Async::AMQP - provides client interface to AMQP using L
13              
14             =head1 VERSION
15              
16             version 2.000
17              
18             =head1 SYNOPSIS
19              
20             use IO::Async::Loop;
21             use Net::Async::AMQP;
22             my $loop = IO::Async::Loop->new;
23             $loop->add(my $amqp = Net::Async::AMQP->new);
24             $amqp->connect(
25             host => 'localhost',
26             user => 'guest',
27             pass => 'guest',
28             )->get;
29              
30             =head1 DESCRIPTION
31              
32             Does AMQP things. Note that the API may change before the stable 1.000
33             release - L are listed below if you want
34             to evaluate other options.
35              
36             If you want a higher-level API which manages channels and connections, try
37             L.
38              
39             Examples are in the C directory.
40              
41             =head2 AMQP support
42              
43             The following AMQP features are supported:
44              
45             =over 4
46              
47             =item * Queue declare, bind, delete
48              
49             =item * Exchange declare, delete
50              
51             =item * Consumer setup and cancellation
52              
53             =item * Message publishing
54              
55             =item * Explicit ACK
56              
57             =item * QoS
58              
59             =item * SSL
60              
61             =back
62              
63             =head2 RabbitMQ-specific features
64              
65             RabbitMQ provides some additional features:
66              
67             =over 4
68              
69             =item * Exchange-to-exchange binding
70              
71             =item * Server flow control notification
72              
73             =item * Consumer cancellation notification
74              
75             =item * Reject
76              
77             =item * TTL for message expiry
78              
79             =item * 255-level priorities
80              
81             =back
82              
83             =head2 Missing features
84              
85             The following features aren't currently implemented - raise a request via RT or by email (L)
86             if you want any of these:
87              
88             =over 4
89              
90             =item * Transactions
91              
92             =item * Flow control
93              
94             =item * SASL auth
95              
96             =back
97              
98             This implementation is designed to handle many simultaneous channels and connections. If you just want a
99             single consumer/publisher, one of the librabbitmq-c implementations may be sufficient.
100              
101             =cut
102              
103 10     10   74040 use Net::AMQP;
  0            
  0            
104             use Net::AMQP::Common qw(:all);
105              
106             use Future;
107             use curry::weak;
108             use Class::ISA ();
109             use List::Util qw(min);
110             use List::UtilsBy qw(extract_by);
111             use File::ShareDir ();
112             use Time::HiRes ();
113             use Scalar::Util qw(weaken);
114             use Mixin::Event::Dispatch::Bus;
115              
116             =head1 CONSTANTS
117              
118             =head2 AUTH_MECH
119              
120             Defines the mechanism used for authentication. Currently only AMQPLAIN
121             is supported.
122              
123             =cut
124              
125             use constant AUTH_MECH => 'AMQPLAIN';
126              
127             =head2 PAYLOAD_HEADER_LENGTH
128              
129             Length of header used in payload messages. Defined by the AMQP standard
130             as 8 bytes.
131              
132             =cut
133              
134             use constant PAYLOAD_HEADER_LENGTH => 8;
135              
136             =head2 MAX_FRAME_SIZE
137              
138             Largest amount of data we'll attempt to send in a single frame. Actual
139             frame limit will be negotiated with the remote server. Defaults to 262144.
140              
141             =cut
142              
143             use constant MAX_FRAME_SIZE => 262144;
144              
145             =head2 MAX_CHANNELS
146              
147             Maximum number of channels to request. Defaults to the AMQP limit (65535).
148             Attempting to set this any higher will not end well, it's an unsigned 16-bit
149             value.
150              
151             =cut
152              
153             use constant MAX_CHANNELS => 65535;
154              
155             =head2 HEARTBEAT_INTERVAL
156              
157             Interval in seconds between heartbeat frames, zero to disable. Can be
158             overridden by C in the environment, default
159             is 0 (disabled).
160              
161             =cut
162              
163             use constant HEARTBEAT_INTERVAL => $ENV{PERL_AMQP_HEARTBEAT_INTERVAL} // 0;
164              
165             use Net::Async::AMQP::Channel;
166             use Net::Async::AMQP::Queue;
167             use Net::Async::AMQP::Utils;
168              
169             =head1 PACKAGE VARIABLES
170              
171             =head2 $XML_SPEC
172              
173             This defines the path to the AMQP XML spec, which L uses
174             to create methods and handlers for the appropriate version of the MQ
175             protocol.
176              
177             Defaults to an extended version of the 0.9.1 protocol as used by RabbitMQ,
178             this is found in the C distribution sharedir (see
179             L).
180              
181             Normally, you should be able to ignore this. If you want to load an alternative
182             spec, note that (a) this is global, rather than per-instance, (b) it needs to
183             be set before you C this module.
184              
185             BEGIN { $Net::Async::AMQP::XML_SPEC = '/tmp/amqp.xml' }
186             use Net::Async::AMQP;
187              
188             Once loaded, this module will not attempt to apply the spec again.
189              
190             =cut
191              
192             our $XML_SPEC;
193             our $SPEC_LOADED;
194             BEGIN {
195             $XML_SPEC //= File::ShareDir::dist_file(
196             'Net-Async-AMQP',
197             'amqp0-9-1.extended.xml'
198             );
199              
200             # Load the appropriate protocol definitions. RabbitMQ uses a
201             # modified version of AMQP 0.9.1
202             Net::AMQP::Protocol->load_xml_spec($XML_SPEC) unless $SPEC_LOADED++;
203             }
204              
205             =head1 %CONNECTION_DEFAULTS
206              
207             The default parameters to use for L. Changing these values is permitted,
208             but do not attempt to delete from or add any entries to the hash.
209              
210             Passing parameters directly to L is much safer, please do that instead.
211              
212             =cut
213              
214             our %CONNECTION_DEFAULTS = (
215             port => 5672,
216             host => 'localhost',
217             user => 'guest',
218             pass => 'guest',
219             );
220              
221             =head1 METHODS
222              
223             =cut
224              
225             =head2 configure
226              
227             Set up variables. Takes the following optional named parameters:
228              
229             =over 4
230              
231             =item * heartbeat_interval - (optional) interval between heartbeat messages,
232             default is set by the L constant
233              
234             =item * max_channels - how many channels to allow on this connection,
235             default is defined by the L constant
236              
237             =back
238              
239             Returns the new instance.
240              
241             =cut
242              
243             sub configure {
244             my ($self, %args) = @_;
245             for (qw(heartbeat_interval max_channels)) {
246             $self->{$_} = delete $args{$_} if exists $args{$_}
247             }
248             $self->SUPER::configure(%args)
249             }
250              
251             =head2 bus
252              
253             Event bus. Used for sharing global events such as connection closure.
254              
255             =cut
256              
257             sub bus { $_[0]->{bus} ||= Mixin::Event::Dispatch::Bus->new }
258              
259             =head2 connect
260              
261             Takes the following parameters:
262              
263             =over 4
264              
265             =item * port - the AMQP port, defaults to 5672, can be a service name if preferred
266              
267             =item * host - host to connect to, defaults to localhost
268              
269             =item * local_host - our local IP to connect from
270              
271             =item * user - which user to connect as, defaults to guest
272              
273             =item * pass - the password for this user, defaults to guest
274              
275             =item * ssl - true if you want to connect over SSL
276              
277             =item * SSL_* - SSL-specific parameters, see L and L for details
278              
279             =back
280              
281             Returns $self.
282              
283             =cut
284              
285             sub connect {
286             my $self = shift;
287             my %args = @_;
288              
289             die 'no loop' unless my $loop = $self->loop;
290              
291             my $f = $self->loop->new_future;
292              
293             # Apply defaults
294             $self->{$_} = $args{$_} //= $CONNECTION_DEFAULTS{$_} for keys %CONNECTION_DEFAULTS;
295              
296             # Remember our event callbacks so we can unsubscribe
297             my $connected;
298             my $close;
299              
300             # Clean up once we succeed/fail
301             $f->on_ready(sub {
302             $self->bus->unsubscribe_from_event(close => $close) if $close;
303             $self->bus->unsubscribe_from_event(connected => $connected) if $connected;
304             undef $close;
305             undef $connected;
306             undef $self;
307             undef $f;
308             });
309              
310             # One-shot event on connection
311             $self->bus->subscribe_to_event(connected => $connected = sub {
312             $f->done($self) unless $f->is_ready;
313             });
314             # Also pick up connection termination
315             $self->bus->subscribe_to_event(close => $close = sub {
316             $f->fail(connect => 'Remote closed connection') unless $f->is_ready;
317             });
318              
319             # Support SSL connection
320             require IO::Async::SSL if $args{ssl};
321             my $method = $args{ssl} ? 'SSL_connect' : 'connect';
322             $loop->$method(
323             host => $self->{host},
324             # local_host can be used to send from a different source address,
325             # sometimes useful for routing purposes or loadtesting
326             (exists $args{local_host} ? (local_host => $args{local_host}) : ()),
327             service => $self->{port},
328             socktype => 'stream',
329              
330             on_stream => $self->curry::on_stream(\%args),
331              
332             on_resolve_error => $f->curry::fail('resolve'),
333             on_connect_error => $f->curry::fail('connect'),
334             ($args{ssl}
335             ? (on_ssl_error => $f->curry::fail('ssl'))
336             : ()
337             ),
338             (map {; $_ => $args{$_} } grep /^SSL/, keys %args)
339             );
340             $f;
341             }
342              
343             =head2 on_stream
344              
345             Called once the underlying TCP connection has been established.
346              
347             Returns nothing of importance.
348              
349             =cut
350              
351             sub on_stream {
352             my ($self, $args, $stream) = @_;
353             $self->debug_printf("Stream received");
354             $self->{stream} = $stream;
355             $stream->configure(
356             on_read => $self->curry::weak::on_read,
357             );
358             $self->add_child($stream);
359             $self->apply_heartbeat_timer if $self->heartbeat_interval;
360             $self->post_connect(%$args);
361             return;
362             }
363              
364             sub dump_frame {
365             my ($self, $pkt) = @_;
366             my ($type) = unpack 'C1', substr $pkt, 0, 1, '';
367             printf "Type: %02x (%s)\n", $type, {
368             1 => 'Method',
369             }->{$type};
370              
371             my ($chan) = unpack 'n1', substr $pkt, 0, 2, '';
372             printf "Channel: %d\n", $chan;
373              
374             my ($len) = unpack 'N1', substr $pkt, 0, 4, '';
375             printf "Length: %d bytes\n", $len;
376              
377             if($type == 1) {
378             my ($class, $method) = unpack 'n1n1', substr $pkt, 0, 4, '';
379             printf "Class: %s\n", $class;
380             printf "Method: %s\n", $method;
381             }
382             }
383              
384             =head2 on_read
385              
386             Called whenever there's data available to be read.
387              
388             =cut
389              
390             sub on_read {
391             my ($self, $stream, $buffref, $eof) = @_;
392              
393             $self->last_frame_time(Time::HiRes::time);
394              
395             # As each frame is parsed it will be removed from the buffer
396             $self->process_frame($_) for Net::AMQP->parse_raw_frames($buffref);
397             $self->on_closed if $eof;
398             return 0;
399             }
400              
401             =head2 on_closed
402              
403             Called when the TCP connection is closed.
404              
405             =cut
406              
407             sub on_closed {
408             my $self = shift;
409             my $reason = shift // 'unknown';
410             $self->debug_printf("Connection closed [%s]", $reason);
411             delete $self->{connected};
412              
413             for my $ch (grep $_, values %{$self->{channel_by_id}}) {
414             $ch->bus->invoke_event(
415             'close',
416             code => undef,
417             reason => 'Connection closed: ' . $reason,
418             );
419             $self->channel_closed($ch->id);
420             }
421              
422             # Clean up any mismatching entries in the Future map
423             $_->cancel for grep !$_->is_ready, values %{$self->{channel_map}};
424             $self->{channel_map} = {};
425              
426             $self->stream->close if $self->stream;
427             for (qw(stream heartbeat_send_timer heartbeat_receive_timer)) {
428             $self->debug_printf("Remove child %s", $_);
429             (delete $self->{$_})->remove_from_parent if $self->{$_};
430             }
431             $self->bus->invoke_event(close => $reason)
432             }
433              
434             =head2 post_connect
435              
436             Sends initial startup header and applies listener for the C< Connection::Start > message.
437              
438             Returns $self.
439              
440             =cut
441              
442             sub post_connect {
443             my $self = shift;
444             my %args = @_;
445              
446             my %client_prop = (
447             platform => $args{platform} // 'Perl/NetAsyncAMQP',
448             product => $args{product} // __PACKAGE__,
449             information => $args{information} // 'http://search.cpan.org/perldoc?Net::Async::AMQP',
450             version => $args{version} // $VERSION,
451             ($args{client_properties} ? %{$args{client_properties}} : ()),
452             );
453              
454             $self->push_pending(
455             'Connection::Start' => sub {
456             my ($self, $frame) = @_;
457             my $method_frame = $frame->method_frame;
458             my @mech = split ' ', $method_frame->mechanisms;
459             die "Auth mechanism " . AUTH_MECH . " not supported, unable to continue - options were: @mech" unless grep $_ eq AUTH_MECH, @mech;
460             my $output = Net::AMQP::Frame::Method->new(
461             channel => 0,
462             method_frame => Net::AMQP::Protocol::Connection::StartOk->new(
463             client_properties => \%client_prop,
464             mechanism => AUTH_MECH,
465             locale => $args{locale} // 'en_GB',
466             response => {
467             LOGIN => $args{user},
468             PASSWORD => $args{pass},
469             },
470             ),
471             );
472             $self->setup_tuning(%args);
473             $self->send_frame($output);
474             }
475             );
476              
477             # Send the initial header bytes. It'd be nice
478             # if we could use L
479             # for this, but it seems to be sending 1 for
480             # the protocol ID, and the revision number is
481             # before the major/minor version.
482             # $self->write(Net::AMQP::Protocol->header);
483             $self->write($self->header_bytes);
484             $self
485             }
486              
487             =head2 setup_tuning
488              
489             Applies listener for the Connection::Tune message, used for determining max frame size and heartbeat settings.
490              
491             Returns $self.
492              
493             =cut
494              
495             sub setup_tuning {
496             my $self = shift;
497             my %args = @_;
498             $self->push_pending(
499             'Connection::Tune' => sub {
500             my ($self, $frame) = @_;
501             my $method_frame = $frame->method_frame;
502             # Lowest value for frame max wins - our predef constant, or whatever the server suggests
503             $self->frame_max(my $frame_max = min $method_frame->frame_max, $self->MAX_FRAME_SIZE);
504             $self->channel_max(my $channel_max = $method_frame->channel_max || $self->max_channels || $self->MAX_CHANNELS);
505             $self->debug_printf("Remote says %d channels, will use %d", $method_frame->channel_max, $channel_max);
506             $self->{channel} = 0;
507             $self->send_frame(
508             Net::AMQP::Protocol::Connection::TuneOk->new(
509             channel_max => $channel_max,
510             frame_max => $frame_max,
511             heartbeat => $self->heartbeat_interval,
512             )
513             );
514             $self->open_connection(%args);
515             }
516             );
517             }
518              
519             =head2 open_connection
520              
521             Establish a new connection to a vhost - this is called after tuning is complete,
522             and must happen before any channel connections are attempted.
523              
524             Returns $self.
525              
526             =cut
527              
528             sub open_connection {
529             my $self = shift;
530             my %args = @_;
531             $self->setup_connection(%args);
532             $self->send_frame(
533             Net::AMQP::Frame::Method->new(
534             method_frame => Net::AMQP::Protocol::Connection::Open->new(
535             virtual_host => $args{vhost} // '/',
536             capabilities => '',
537             insist => 1,
538             ),
539             )
540             );
541             $self
542             }
543              
544             =head2 setup_connection
545              
546             Applies listener for the Connection::OpenOk message, which triggers the
547             C event.
548              
549             Returns $self.
550              
551             =cut
552              
553             sub setup_connection {
554             my $self = shift;
555             my %args = @_;
556             $self->push_pending(
557             'Connection::OpenOk' => sub {
558             my ($self, $frame) = @_;
559             my $method_frame = $frame->method_frame;
560             $self->debug_printf("OpenOk received");
561             $self->connected->done;
562             $self->bus->invoke_event(connected =>);
563             }
564             );
565             $self
566             }
567              
568             =head2 connected
569              
570             Returns a L which will resolve when the MQ connection is ready
571             for use.
572              
573             =cut
574              
575             sub connected {
576             my ($self) = @_;
577             $self->{connected} ||= $self->future(set_label => 'MQ connection');
578             }
579              
580             =head2 next_channel
581              
582             Returns the next available channel ready for L.
583             Note that whatever it reports will be completely wrong if you've
584             manually specified a channel anywhere, so don't do that.
585              
586             If channels have been closed on this connection, those IDs will be
587             reused in preference to handing out a new ID.
588              
589             =cut
590              
591             sub next_channel {
592             my $self = shift;
593             $self->{channel} //= 0;
594             return shift @{$self->{available_channel_id}} if @{$self->{available_channel_id} ||= [] };
595             return undef if $self->{channel} >= $self->channel_max;
596             ++$self->{channel}
597             }
598              
599             =head2 create_channel
600              
601             Returns a new ::Channel instance, populating the map of assigned channels in the
602             process. Takes a single parameter:
603              
604             =over 4
605              
606             =item * $id - the channel ID, can be undef to assign via L
607              
608             =back
609              
610             =cut
611              
612             sub create_channel {
613             my ($self, $id) = @_;
614             $id //= $self->next_channel;
615             die "No channel available" unless $id;
616              
617             my $f = $self->loop->new_future;
618             $self->{channel_map}{$id} = $f;
619             $self->add_child(
620             my $c = Net::Async::AMQP::Channel->new(
621             amqp => $self,
622             future => $f,
623             id => $id,
624             )
625             );
626             $self->{channel_by_id}{$id} = $c;
627             $self->debug_printf("Record channel %d as %s", $id, $c);
628             return $c;
629             }
630              
631             =head2 open_channel
632              
633             Opens a new channel.
634              
635             Returns the new L instance.
636              
637             =cut
638              
639             sub open_channel {
640             my $self = shift;
641             my %args = @_;
642             my $channel;
643             if($args{channel}) {
644             $channel = delete $args{channel};
645             extract_by { $channel == $_ } @{$self->{available_channel_id}} if exists $self->{available_channel_id};
646             } else {
647             $channel = $self->next_channel;
648             }
649             die "Channel " . $channel . " exists already: " . $self->{channel_map}{$channel} if exists $self->{channel_map}{$channel};
650             my $c = $self->create_channel($channel);
651             my $f = $c->future;
652              
653             my $frame = Net::AMQP::Frame::Method->new(
654             method_frame => Net::AMQP::Protocol::Channel::Open->new,
655             );
656             $frame->channel($channel);
657             $c->push_pending(
658             'Channel::OpenOk' => sub {
659             my ($c, $frame) = @_;
660             my $f = $self->{channel_map}{$frame->channel};
661             $f->done($c) unless $f->is_ready;
662             }
663             );
664             $self->send_frame($frame);
665             return $f;
666             }
667              
668             =head2 close
669              
670             Close the connection.
671              
672             Returns a L which will resolve with C<$self> when the connection is closed.
673              
674             =cut
675              
676             sub close {
677             my $self = shift;
678             my %args = @_;
679              
680             $self->heartbeat_send_timer->stop if $self->heartbeat_send_timer;
681              
682             my $f = $self->loop->new_future;
683              
684             # We might end up with a connection shutdown rather
685             # than a clean Connection::Close response, so
686             # we need to handle both possibilities
687             my @handler;
688             $self->bus->subscribe_to_event(
689             @handler = (
690             close => sub {
691             my ($ev, $reason) = @_;
692             splice @handler;
693             eval { $ev->unsubscribe; };
694             return unless $f;
695             $f->done($reason) unless $f->is_ready;
696             weaken $f;
697             }
698             )
699             );
700              
701             my $frame = Net::AMQP::Frame::Method->new(
702             method_frame => Net::AMQP::Protocol::Connection::Close->new(
703             reply_code => $args{code} // 320,
704             reply_text => $args{reason} // 'Request connection close',
705             ),
706             );
707             $self->push_pending(
708             'Connection::CloseOk' => [ $f, $self ],
709             );
710             $self->send_frame($frame);
711              
712             # ... and make sure we clean up after ourselves
713             $f->on_ready(sub {
714             $self->bus->unsubscribe_from_event(
715             @handler
716             );
717             weaken $f if $f;
718             });
719             }
720              
721             =head2 channel_closed
722              
723             =cut
724              
725             sub channel_closed {
726             my ($self, $id) = @_;
727             my $f = delete $self->{channel_map}{$id}
728             or die "Had a close indication for channel $id but this channel is unknown";
729             $f->cancel unless $f->is_ready;
730             $self->remove_child(delete $self->{channel_by_id}{$id});
731              
732             # Record this ID as available for the next time we need to open a new channel
733             push @{$self->{available_channel_id}}, $id;
734             $self
735             }
736              
737             sub channel_by_id { my $self = shift; $self->{channel_by_id}{+shift} }
738              
739             =head2 next_pending
740              
741             Retrieves the next pending handler for the given incoming frame type (see L),
742             and calls it.
743              
744             Takes the following parameters:
745              
746             =over 4
747              
748             =item * $type - the frame type, such as 'Basic::ConnectOk'
749              
750             =item * $frame - the frame itself
751              
752             =back
753              
754             Returns $self.
755              
756             =cut
757              
758             sub next_pending {
759             my ($self, $type, $frame) = @_;
760             $self->debug_printf("Check next pending for %s", $type);
761              
762             if($type eq 'Connection::Close') {
763             $self->on_closed($frame->method_frame->reply_text);
764             return $self;
765             }
766              
767             if(my $next = shift @{$self->{pending}{$type} || []}) {
768             # We have a registered handler for this frame type. This usually
769             # means that we've sent a frame and are awaiting a response.
770             if(ref($next) eq 'ARRAY') {
771             my ($f, @args) = @$next;
772             $f->done(@args) unless $f->is_ready;
773             } else {
774             $next->($self, $frame, @_);
775             }
776             } else {
777             # It's quite possible we'll see unsolicited frames back from
778             # the server: these will typically be errors, connection close,
779             # or consumer cancellation if the consumer_cancel_notify
780             # option is set (RabbitMQ). We don't expect many so report
781             # them when in debug mode.
782             $self->debug_printf("We had no pending handlers for %s, raising as event", $type);
783             $self->bus->invoke_event(
784             unexpected_frame => $type, $frame
785             );
786             }
787             $self
788             }
789              
790             =head1 METHODS - Accessors
791              
792             =head2 host
793              
794             The current host.
795              
796             =cut
797              
798             sub host { shift->{host} }
799              
800             =head2 vhost
801              
802             Virtual host.
803              
804             =cut
805              
806             sub vhost { shift->{vhost} }
807              
808             =head2 port
809              
810             Port number. Usually 5672.
811              
812             =cut
813              
814             sub port { shift->{port} }
815              
816             =head2 user
817              
818             MQ user.
819              
820             =cut
821              
822             sub user { shift->{user} }
823              
824             =head2 frame_max
825              
826             Maximum number of bytes allowed in any given frame. This is the
827             value negotiated with the remote server.
828              
829             =cut
830              
831             sub frame_max {
832             my $self = shift;
833             return $self->{frame_max} unless @_;
834              
835             $self->{frame_max} = shift;
836             $self
837             }
838              
839             =head2 channel_max
840              
841             Maximum number of channels. This is whatever we ended up with after initial negotiation.
842              
843             =cut
844              
845             sub channel_max {
846             my $self = shift;
847             return $self->{channel_max} ||= $self->{max_channels} || $self->MAX_CHANNELS unless @_;
848              
849             $self->{channel_max} = shift;
850             $self
851             }
852              
853             sub max_channels { shift->{max_channels} }
854              
855             =head2 last_frame_time
856              
857             Timestamp of the last frame we received from the remote. Used for handling heartbeats.
858              
859             =cut
860              
861             sub last_frame_time {
862             my $self = shift;
863             return $self->{last_frame_time} unless @_;
864              
865             $self->{last_frame_time} = shift;
866             $self->heartbeat_receive_timer->reset if $self->heartbeat_receive_timer;
867             $self
868             }
869              
870             =head2 stream
871              
872             Returns the current L for the AMQP connection.
873              
874             =cut
875              
876             sub stream { shift->{stream} }
877              
878             =head2 incoming_message
879              
880             L for the current incoming message (received in two or more parts:
881             the header then all body chunks).
882              
883             =cut
884              
885             sub incoming_message { shift->{incoming_message} }
886              
887             =head1 METHODS - Internal
888              
889             The following methods are intended for internal use. They are documented
890             for completeness but should not normally be needed outside this library.
891              
892             =cut
893              
894             =head2 heartbeat_interval
895              
896             Current maximum interval between frames.
897              
898             =cut
899              
900             sub heartbeat_interval { shift->{heartbeat_interval} //= HEARTBEAT_INTERVAL }
901              
902             =head2 missed_heartbeats_allowed
903              
904             How many times we allow the remote to miss the frame-sending deadline in a row
905             before we give up and close the connection. Defined by the protocol, should be
906             3x heartbeats.
907              
908             =cut
909              
910             sub missed_heartbeats_allowed { 3 }
911              
912             =head2 apply_heartbeat_timer
913              
914             Enable both heartbeat timers.
915              
916             =cut
917              
918             sub apply_heartbeat_timer {
919             my $self = shift;
920             { # On expiry, will trigger a heartbeat send from us to the server
921             my $timer = IO::Async::Timer::Countdown->new(
922             delay => $self->heartbeat_interval,
923             on_expire => $self->curry::weak::send_heartbeat,
924             );
925             $self->add_child($timer);
926             $timer->start;
927             Scalar::Util::weaken($self->{heartbeat_send_timer} = $timer);
928             }
929             { # This timer indicates no traffic from the remote for 3*heartbeat
930             my $timer = IO::Async::Timer::Countdown->new(
931             delay => $self->missed_heartbeats_allowed * $self->heartbeat_interval,
932             on_expire => $self->curry::weak::handle_heartbeat_failure,
933             );
934             $self->add_child($timer);
935             $timer->start;
936             Scalar::Util::weaken($self->{heartbeat_receive_timer} = $timer);
937             }
938             $self
939             }
940              
941             =head2 reset_heartbeat
942              
943             Resets our side of the heartbeat timer.
944              
945             This is used to ensure we send data at least once every L
946             seconds.
947              
948             =cut
949              
950             sub reset_heartbeat {
951             my $self = shift;
952             return unless my $timer = $self->heartbeat_send_timer;
953              
954             $timer->reset;
955             }
956              
957              
958             =head2 heartbeat_receive_timer
959              
960             Timer for tracking frames we've received.
961              
962             =cut
963              
964             sub heartbeat_receive_timer { shift->{heartbeat_receive_timer} }
965              
966             =head2 heartbeat_send_timer
967              
968             Timer for tracking when we're due to send out something.
969              
970             =cut
971              
972             sub heartbeat_send_timer { shift->{heartbeat_send_timer} }
973              
974             =head2 handle_heartbeat_failure
975              
976             Called when heartbeats are enabled and we've had no response from the server for 3 heartbeat
977             intervals (see L). We'd expect some frame from the remote - even
978             if just a heartbeat frame - at least once every heartbeat interval so if this triggers then
979             we're likely dealing with a dead or heavily loaded server.
980              
981             This will invoke the L then close the connection.
982              
983             =cut
984              
985             sub handle_heartbeat_failure {
986             my $self = shift;
987             $self->debug_printf("Heartbeat timeout: no data received from server since %s, closing connection", $self->last_frame_time);
988              
989             $self->bus->invoke_event(
990             heartbeat_failure => $self->last_frame_time
991             );
992             $self->close;
993             }
994              
995             =head2 send_heartbeat
996              
997             Sends the heartbeat frame.
998              
999             =cut
1000              
1001             sub send_heartbeat {
1002             my $self = shift;
1003             $self->debug_printf("Sending heartbeat frame");
1004              
1005             # Heartbeat messages apply to the connection rather than
1006             # individual channels, so we use channel 0 to represent this
1007             $self->send_frame(
1008             Net::AMQP::Frame::Heartbeat->new,
1009             channel => 0,
1010             );
1011              
1012             # Ensure heartbeat timer is active for next time
1013             if(my $timer = $self->heartbeat_send_timer) {
1014             $timer->reset;
1015             $timer->start;
1016             }
1017             }
1018              
1019             =head2 push_pending
1020              
1021             Adds the given handler(s) to the pending handler list for the given type(s).
1022              
1023             Takes one or more of the following parameter pairs:
1024              
1025             =over 4
1026              
1027             =item * $type - the frame type, see L
1028              
1029             =item * $code - the coderef to call, will be invoked once as follows when a matching frame is received:
1030              
1031             $code->($self, $frame, @_)
1032              
1033             =back
1034              
1035             Returns C< $self >.
1036              
1037             =cut
1038              
1039             sub push_pending {
1040             my $self = shift;
1041             while(@_) {
1042             my ($type, $code) = splice @_, 0, 2;
1043             push @{$self->{pending}{$type}}, $code;
1044             }
1045             return $self;
1046             }
1047              
1048             =head2 remove_pending
1049              
1050             Removes a coderef from the pending event handler.
1051              
1052             Returns C< $self >.
1053              
1054             =cut
1055              
1056             sub remove_pending {
1057             my $self = shift;
1058             while(@_) {
1059             my ($type, $code) = splice @_, 0, 2;
1060             # This is the same as extract_by { $_ eq $code } @{$self->{pending}{$type}};,
1061             # but since we'll be calling it a lot might as well do it inline:
1062             splice
1063             @{$self->{pending}{$type}},
1064             $_,
1065             1 for grep {
1066             $self->{pending}{$type}[$_] eq $code
1067             } reverse 0..$#{$self->{pending}{$type}};
1068             }
1069             return $self;
1070             }
1071              
1072             =head2 write
1073              
1074             Writes data to the server.
1075              
1076             Returns a L which will resolve to an empty list when
1077             done.
1078              
1079             =cut
1080              
1081             sub write {
1082             my $self = shift;
1083             $self->stream->write(@_)
1084             }
1085              
1086             =head2 process_frame
1087              
1088             Process a single incoming frame.
1089              
1090             Takes the following parameters:
1091              
1092             =over 4
1093              
1094             =item * $frame - the L instance
1095              
1096             =back
1097              
1098             Returns $self.
1099              
1100             =cut
1101              
1102             sub process_frame {
1103             my ($self, $frame) = @_;
1104             $self->debug_printf("Received %s", amqp_frame_info($frame));
1105              
1106             my $frame_type = amqp_frame_type($frame);
1107              
1108             if($frame_type eq 'Heartbeat') {
1109             # Ignore these completely. Since we have the last frame update at the data-read
1110             # level, there's nothing for us to do here.
1111             $self->debug_printf("Heartbeat received");
1112              
1113             # A peer that receives an invalid heartbeat frame MUST raise a connection
1114             # exception with reply code 501 (frame error)
1115             $self->close(
1116             code => 501,
1117             reason => 'Frame error - heartbeat should have channel 0'
1118             ) if $frame->channel;
1119              
1120             return $self;
1121             } elsif(my $ch = $self->channel_by_id($frame->channel)) {
1122             $self->debug_printf("Processing frame %s on channel %d", $frame_type, $ch);
1123             return $self if $ch->next_pending($frame);
1124             }
1125              
1126             $self->debug_printf("Processing connection frame %s", $frame_type);
1127              
1128             $self->next_pending($frame_type, $frame);
1129              
1130             return $self;
1131             }
1132              
1133             =head2 split_payload
1134              
1135             Splits a message into separate frames.
1136              
1137             Takes the $payload as a scalar containing byte data, and the following parameters:
1138              
1139             =over 4
1140              
1141             =item * exchange - where we're sending the message
1142              
1143             =item * routing_key - other part of message destination
1144              
1145             =back
1146              
1147             Additionally, the following headers can be passed:
1148              
1149             =over 4
1150              
1151             =item * content_type
1152              
1153             =item * content_encoding
1154              
1155             =item * headers
1156              
1157             =item * delivery_mode
1158              
1159             =item * priority
1160              
1161             =item * correlation_id
1162              
1163             =item * reply_to
1164              
1165             =item * expiration
1166              
1167             =item * message_id
1168              
1169             =item * timestamp
1170              
1171             =item * type
1172              
1173             =item * user_id
1174              
1175             =item * app_id
1176              
1177             =item * cluster_id
1178              
1179             =back
1180              
1181             Returns list of frames suitable for passing to L.
1182              
1183             =cut
1184              
1185             sub split_payload {
1186             my $self = shift;
1187             my $payload = shift;
1188             my %opts = @_;
1189              
1190             # Get the original content length first
1191             my $payload_size = length $payload;
1192              
1193             my @body_frames;
1194             while (length $payload) {
1195             my $chunk = substr $payload, 0, $self->frame_max - PAYLOAD_HEADER_LENGTH, '';
1196             push @body_frames, Net::AMQP::Frame::Body->new(
1197             payload => $chunk
1198             );
1199             }
1200              
1201             return
1202             Net::AMQP::Protocol::Basic::Publish->new(
1203             map {; $_ => $opts{$_} } grep defined($opts{$_}), qw(ticket exchange routing_key mandatory immediate)
1204             ),
1205             Net::AMQP::Frame::Header->new(
1206             weight => $opts{weight} || 0,
1207             body_size => $payload_size,
1208             header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
1209             map {; $_ => $opts{$_} } grep defined($opts{$_}), qw(
1210             content_type
1211             content_encoding
1212             headers
1213             delivery_mode
1214             priority
1215             correlation_id
1216             reply_to
1217             expiration
1218             message_id
1219             timestamp
1220             type
1221             user_id
1222             app_id
1223             cluster_id
1224             )
1225             ),
1226             ),
1227             @body_frames;
1228             }
1229              
1230             =head2 send_frame
1231              
1232             Send a single frame.
1233              
1234             Takes the $frame instance followed by these optional named parameters:
1235              
1236             =over 4
1237              
1238             =item * channel - which channel we should send on
1239              
1240             =back
1241              
1242             Returns a L which will resolve to an empty list
1243             when the frame has been written (this does not guarantee that the server has received it).
1244              
1245             =cut
1246              
1247             sub send_frame {
1248             my $self = shift;
1249             my $frame = shift;
1250             my %args = @_;
1251              
1252             # Apply defaults and wrap as required
1253             $frame = $frame->frame_wrap if $frame->isa("Net::AMQP::Protocol::Base");
1254             die "Frame has channel ID " . $frame->channel . " but we wanted " . $args{channel}
1255             if defined $frame->channel && defined $args{channel} && $frame->channel != $args{channel};
1256              
1257             $frame->channel($args{channel} // 0) unless defined $frame->channel;
1258              
1259             $self->debug_printf("Sending %s", amqp_frame_info($frame));
1260              
1261             # Get bytes to send across our transport
1262             my $data = $frame->to_raw_frame;
1263              
1264             # warn "Sending data: " . Dumper($frame) . "\n";
1265             $self->write(
1266             $data,
1267             )->on_done($self->curry::reset_heartbeat)
1268             }
1269              
1270             =head2 header_bytes
1271              
1272             Byte string representing the header bytes we should send on initial TCP connect.
1273             Net::AMQP uses AMQP\x01\x01\x09\x01, which does not appear to comply with AMQP 0.9.1
1274             section 4.2.2.
1275              
1276             =cut
1277              
1278             sub header_bytes { "AMQP\x00\x00\x09\x01" }
1279              
1280             sub _add_to_loop {
1281             my ($self, $loop) = @_;
1282             $self->debug_printf("Added %s to loop", $self);
1283             }
1284              
1285             =head1 future
1286              
1287             Returns a new L instance.
1288              
1289             Supports optional named parameters for setting label etc.
1290              
1291             =cut
1292              
1293             sub future {
1294             my $self = shift;
1295             my $f = $self->loop->new_future;
1296             while(my ($k, $v) = splice @_, 0, 2) {
1297             $f->can($k) ? $f->$k($v) : die "Unable to call method $k on $f";
1298             }
1299             $f
1300             }
1301              
1302             1;
1303              
1304             __END__