File Coverage

blib/lib/Net/Async/AMQP.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Net::Async::AMQP;
2             # ABSTRACT: IO::Async support for the AMQP protocol
3 5     5   129651 use strict;
  5         11  
  5         114  
4 5     5   27 use warnings;
  5         11  
  5         135  
5              
6 5     5   3225 use parent qw(IO::Async::Notifier);
  5         1365  
  5         29  
7              
8             our $VERSION = '1.000';
9              
10             =head1 NAME
11              
12             Net::Async::AMQP - provides client interface to AMQP using L
13              
14             =head1 VERSION
15              
16             version 1.000
17              
18             =head1 SYNOPSIS
19              
20             use IO::Async::Loop;
21             use Net::Async::AMQP;
22             my $loop = IO::Async::Loop->new;
23             $loop->add(my $amqp = Net::Async::AMQP->new);
24             $amqp->connect(
25             host => 'localhost',
26             user => 'guest',
27             pass => 'guest',
28             )->get;
29              
30             =head1 DESCRIPTION
31              
32             Does AMQP things. Note that the API may change before the stable 1.000
33             release - L are listed below if you want
34             to evaluate other options.
35              
36             If you want a higher-level API which manages channels and connections, try
37             L.
38              
39             Examples are in the C directory.
40              
41             =head2 AMQP support
42              
43             The following AMQP features are supported:
44              
45             =over 4
46              
47             =item * Queue declare, bind, delete
48              
49             =item * Exchange declare, delete
50              
51             =item * Consumer setup and cancellation
52              
53             =item * Message publishing
54              
55             =item * Explicit ACK
56              
57             =item * QoS
58              
59             =item * SSL
60              
61             =back
62              
63             =head2 RabbitMQ-specific features
64              
65             RabbitMQ provides some additional features:
66              
67             =over 4
68              
69             =item * Exchange-to-exchange binding
70              
71             =item * Server flow control notification
72              
73             =item * Consumer cancellation notification
74              
75             =back
76              
77             =head2 Missing features
78              
79             The following features aren't currently implemented - raise a request via RT or by email (L)
80             if you want any of these:
81              
82             =over 4
83              
84             =item * Transactions
85              
86             =item * Flow control
87              
88             =item * SASL auth
89              
90             =back
91              
92             This implementation is designed to handle many simultaneous channels and connections. If you just want a
93             single consumer/publisher, one of the librabbitmq-c implementations may be sufficient.
94              
95             =cut
96              
97 5     5   111151 use Net::AMQP;
  0            
  0            
98             use Net::AMQP::Common qw(:all);
99              
100             use Future;
101             use curry::weak;
102             use Class::ISA ();
103             use List::Util qw(min);
104             use List::UtilsBy qw(extract_by);
105             use File::ShareDir ();
106             use Time::HiRes ();
107             use Scalar::Util qw(weaken);
108             use Mixin::Event::Dispatch::Bus;
109              
110             =head1 CONSTANTS
111              
112             =head2 AUTH_MECH
113              
114             Defines the mechanism used for authentication. Currently only AMQPLAIN
115             is supported.
116              
117             =cut
118              
119             use constant AUTH_MECH => 'AMQPLAIN';
120              
121             =head2 PAYLOAD_HEADER_LENGTH
122              
123             Length of header used in payload messages. Defined by the AMQP standard
124             as 8 bytes.
125              
126             =cut
127              
128             use constant PAYLOAD_HEADER_LENGTH => 8;
129              
130             =head2 MAX_FRAME_SIZE
131              
132             Largest amount of data we'll attempt to send in a single frame. Actual
133             frame limit will be negotiated with the remote server. Defaults to 262144.
134              
135             =cut
136              
137             use constant MAX_FRAME_SIZE => 262144;
138              
139             =head2 MAX_CHANNELS
140              
141             Maximum number of channels to request. Defaults to the AMQP limit (65535).
142             Attempting to set this any higher will not end well, it's an unsigned 16-bit
143             value.
144              
145             =cut
146              
147             use constant MAX_CHANNELS => 65535;
148              
149             =head2 HEARTBEAT_INTERVAL
150              
151             Interval in seconds between heartbeat frames, zero to disable. Can be
152             overridden by C in the environment, default
153             is 0 (disabled).
154              
155             =cut
156              
157             use constant HEARTBEAT_INTERVAL => $ENV{PERL_AMQP_HEARTBEAT_INTERVAL} // 0;
158              
159             use Net::Async::AMQP::Channel;
160             use Net::Async::AMQP::Queue;
161             use Net::Async::AMQP::Utils;
162              
163             =head1 PACKAGE VARIABLES
164              
165             =head2 $XML_SPEC
166              
167             This defines the path to the AMQP XML spec, which L uses
168             to create methods and handlers for the appropriate version of the MQ
169             protocol.
170              
171             Defaults to an extended version of the 0.9.1 protocol as used by RabbitMQ,
172             this is found in the C distribution sharedir (see
173             L).
174              
175             Normally, you should be able to ignore this. If you want to load an alternative
176             spec, note that (a) this is global, rather than per-instance, (b) it needs to
177             be set before you C this module.
178              
179             BEGIN { $Net::Async::AMQP::XML_SPEC = '/tmp/amqp.xml' }
180             use Net::Async::AMQP;
181              
182             Once loaded, this module will not attempt to apply the spec again.
183              
184             =cut
185              
186             our $XML_SPEC;
187             our $SPEC_LOADED;
188             BEGIN {
189             $XML_SPEC //= File::ShareDir::dist_file(
190             'Net-Async-AMQP',
191             'amqp0-9-1.extended.xml'
192             );
193              
194             # Load the appropriate protocol definitions. RabbitMQ uses a
195             # modified version of AMQP 0.9.1
196             Net::AMQP::Protocol->load_xml_spec($XML_SPEC) unless $SPEC_LOADED++;
197             }
198              
199             =head1 %CONNECTION_DEFAULTS
200              
201             The default parameters to use for L. Changing these values is permitted,
202             but do not attempt to delete from or add any entries to the hash.
203              
204             Passing parameters directly to L is much safer, please do that instead.
205              
206             =cut
207              
208             our %CONNECTION_DEFAULTS = (
209             port => 5672,
210             host => 'localhost',
211             user => 'guest',
212             pass => 'guest',
213             );
214              
215             =head1 METHODS
216              
217             =cut
218              
219             =head2 configure
220              
221             Set up variables. Takes the following optional named parameters:
222              
223             =over 4
224              
225             =item * heartbeat_interval - (optional) interval between heartbeat messages,
226             default is set by the L constant
227              
228             =item * max_channels - how many channels to allow on this connection,
229             default is defined by the L constant
230              
231             =back
232              
233             Returns the new instance.
234              
235             =cut
236              
237             sub configure {
238             my ($self, %args) = @_;
239             for (qw(heartbeat_interval max_channels)) {
240             $self->{$_} = delete $args{$_} if exists $args{$_}
241             }
242             $self->SUPER::configure(%args)
243             }
244              
245             =head2 bus
246              
247             Event bus. Used for sharing global events such as connection closure.
248              
249             =cut
250              
251             sub bus { $_[0]->{bus} ||= Mixin::Event::Dispatch::Bus->new }
252              
253             =head2 connect
254              
255             Takes the following parameters:
256              
257             =over 4
258              
259             =item * port - the AMQP port, defaults to 5672, can be a service name if preferred
260              
261             =item * host - host to connect to, defaults to localhost
262              
263             =item * local_host - our local IP to connect from
264              
265             =item * user - which user to connect as, defaults to guest
266              
267             =item * pass - the password for this user, defaults to guest
268              
269             =item * ssl - true if you want to connect over SSL
270              
271             =item * SSL_* - SSL-specific parameters, see L and L for details
272              
273             =back
274              
275             Returns $self.
276              
277             =cut
278              
279             sub connect {
280             my $self = shift;
281             my %args = @_;
282              
283             die 'no loop' unless my $loop = $self->loop;
284              
285             my $f = $self->loop->new_future;
286              
287             # Apply defaults
288             $self->{$_} = $args{$_} //= $CONNECTION_DEFAULTS{$_} for keys %CONNECTION_DEFAULTS;
289              
290             # Remember our event callbacks so we can unsubscribe
291             my $connected;
292             my $close;
293              
294             # Clean up once we succeed/fail
295             $f->on_ready(sub {
296             $self->bus->unsubscribe_from_event(close => $close) if $close;
297             $self->bus->unsubscribe_from_event(connected => $connected) if $connected;
298             undef $close;
299             undef $connected;
300             undef $self;
301             undef $f;
302             });
303              
304             # One-shot event on connection
305             $self->bus->subscribe_to_event(connected => $connected = sub {
306             $f->done($self) unless $f->is_ready;
307             });
308             # Also pick up connection termination
309             $self->bus->subscribe_to_event(close => $close = sub {
310             $f->fail(connect => 'Remote closed connection') unless $f->is_ready;
311             });
312              
313             # Support SSL connection
314             require IO::Async::SSL if $args{ssl};
315             my $method = $args{ssl} ? 'SSL_connect' : 'connect';
316             $loop->$method(
317             host => $self->{host},
318             # local_host can be used to send from a different source address,
319             # sometimes useful for routing purposes or loadtesting
320             (exists $args{local_host} ? (local_host => $args{local_host}) : ()),
321             service => $self->{port},
322             socktype => 'stream',
323              
324             on_stream => $self->curry::on_stream(\%args),
325              
326             on_resolve_error => $f->curry::fail('resolve'),
327             on_connect_error => $f->curry::fail('connect'),
328             ($args{ssl}
329             ? (on_ssl_error => $f->curry::fail('ssl'))
330             : ()
331             ),
332             (map {; $_ => $args{$_} } grep /^SSL/, keys %args)
333             );
334             $f;
335             }
336              
337             =head2 on_stream
338              
339             Called once the underlying TCP connection has been established.
340              
341             Returns nothing of importance.
342              
343             =cut
344              
345             sub on_stream {
346             my ($self, $args, $stream) = @_;
347             $self->debug_printf("Stream received");
348             $self->{stream} = $stream;
349             $stream->configure(
350             on_read => $self->curry::weak::on_read,
351             );
352             $self->add_child($stream);
353             $self->apply_heartbeat_timer if $self->heartbeat_interval;
354             $self->post_connect(%$args);
355             return;
356             }
357              
358             sub dump_frame {
359             my ($self, $pkt) = @_;
360             my ($type) = unpack 'C1', substr $pkt, 0, 1, '';
361             printf "Type: %02x (%s)\n", $type, {
362             1 => 'Method',
363             }->{$type};
364              
365             my ($chan) = unpack 'n1', substr $pkt, 0, 2, '';
366             printf "Channel: %d\n", $chan;
367              
368             my ($len) = unpack 'N1', substr $pkt, 0, 4, '';
369             printf "Length: %d bytes\n", $len;
370              
371             if($type == 1) {
372             my ($class, $method) = unpack 'n1n1', substr $pkt, 0, 4, '';
373             printf "Class: %s\n", $class;
374             printf "Method: %s\n", $method;
375             }
376             }
377              
378             =head2 on_read
379              
380             Called whenever there's data available to be read.
381              
382             =cut
383              
384             sub on_read {
385             my ($self, $stream, $buffref, $eof) = @_;
386              
387             $self->last_frame_time(Time::HiRes::time);
388              
389             # As each frame is parsed it will be removed from the buffer
390             $self->process_frame($_) for Net::AMQP->parse_raw_frames($buffref);
391             $self->on_closed if $eof;
392             return 0;
393             }
394              
395             =head2 on_closed
396              
397             Called when the TCP connection is closed.
398              
399             =cut
400              
401             sub on_closed {
402             my $self = shift;
403             my $reason = shift // 'unknown';
404             $self->debug_printf("Connection closed [%s]", $reason);
405              
406             for my $ch (grep $_, values %{$self->{channel_by_id}}) {
407             $ch->bus->invoke_event(
408             'close',
409             code => undef,
410             reason => 'Connection closed: ' . $reason,
411             );
412             $self->channel_closed($ch->id);
413             }
414              
415             # Clean up any mismatching entries in the Future map
416             $_->cancel for grep !$_->is_ready, values %{$self->{channel_map}};
417             $self->{channel_map} = {};
418              
419             $self->stream->close if $self->stream;
420             for (qw(stream heartbeat_send_timer heartbeat_receive_timer)) {
421             $self->debug_printf("Remove child %s", $_);
422             (delete $self->{$_})->remove_from_parent if $self->{$_};
423             }
424             $self->bus->invoke_event(close => $reason)
425             }
426              
427             =head2 post_connect
428              
429             Sends initial startup header and applies listener for the C< Connection::Start > message.
430              
431             Returns $self.
432              
433             =cut
434              
435             sub post_connect {
436             my $self = shift;
437             my %args = @_;
438              
439             my %client_prop = (
440             platform => $args{platform} // 'Perl/NetAsyncAMQP',
441             product => $args{product} // __PACKAGE__,
442             information => $args{information} // 'http://search.cpan.org/perldoc?Net::Async::AMQP',
443             version => $args{version} // $VERSION,
444             ($args{client_properties} ? %{$args{client_properties}} : ()),
445             );
446              
447             $self->push_pending(
448             'Connection::Start' => sub {
449             my ($self, $frame) = @_;
450             my $method_frame = $frame->method_frame;
451             my @mech = split ' ', $method_frame->mechanisms;
452             die "Auth mechanism " . AUTH_MECH . " not supported, unable to continue - options were: @mech" unless grep $_ eq AUTH_MECH, @mech;
453             my $output = Net::AMQP::Frame::Method->new(
454             channel => 0,
455             method_frame => Net::AMQP::Protocol::Connection::StartOk->new(
456             client_properties => \%client_prop,
457             mechanism => AUTH_MECH,
458             locale => $args{locale} // 'en_GB',
459             response => {
460             LOGIN => $args{user},
461             PASSWORD => $args{pass},
462             },
463             ),
464             );
465             $self->setup_tuning(%args);
466             $self->send_frame($output);
467             }
468             );
469              
470             # Send the initial header bytes. It'd be nice
471             # if we could use L
472             # for this, but it seems to be sending 1 for
473             # the protocol ID, and the revision number is
474             # before the major/minor version.
475             # $self->write(Net::AMQP::Protocol->header);
476             $self->write($self->header_bytes);
477             $self
478             }
479              
480             =head2 setup_tuning
481              
482             Applies listener for the Connection::Tune message, used for determining max frame size and heartbeat settings.
483              
484             Returns $self.
485              
486             =cut
487              
488             sub setup_tuning {
489             my $self = shift;
490             my %args = @_;
491             $self->push_pending(
492             'Connection::Tune' => sub {
493             my ($self, $frame) = @_;
494             my $method_frame = $frame->method_frame;
495             # Lowest value for frame max wins - our predef constant, or whatever the server suggests
496             $self->frame_max(my $frame_max = min $method_frame->frame_max, $self->MAX_FRAME_SIZE);
497             $self->channel_max(my $channel_max = $method_frame->channel_max || $self->max_channels || $self->MAX_CHANNELS);
498             $self->debug_printf("Remote says %d channels, will use %d", $method_frame->channel_max, $channel_max);
499             $self->{channel} = 0;
500             $self->send_frame(
501             Net::AMQP::Protocol::Connection::TuneOk->new(
502             channel_max => $channel_max,
503             frame_max => $frame_max,
504             heartbeat => $self->heartbeat_interval,
505             )
506             );
507             $self->open_connection(%args);
508             }
509             );
510             }
511              
512             =head2 open_connection
513              
514             Establish a new connection to a vhost - this is called after tuning is complete,
515             and must happen before any channel connections are attempted.
516              
517             Returns $self.
518              
519             =cut
520              
521             sub open_connection {
522             my $self = shift;
523             my %args = @_;
524             $self->setup_connection(%args);
525             $self->send_frame(
526             Net::AMQP::Frame::Method->new(
527             method_frame => Net::AMQP::Protocol::Connection::Open->new(
528             virtual_host => $args{vhost} // '/',
529             capabilities => '',
530             insist => 1,
531             ),
532             )
533             );
534             $self
535             }
536              
537             =head2 setup_connection
538              
539             Applies listener for the Connection::OpenOk message, which triggers the
540             C event.
541              
542             Returns $self.
543              
544             =cut
545              
546             sub setup_connection {
547             my $self = shift;
548             my %args = @_;
549             $self->push_pending(
550             'Connection::OpenOk' => sub {
551             my ($self, $frame) = @_;
552             my $method_frame = $frame->method_frame;
553             $self->debug_printf("OpenOk received");
554             $self->bus->invoke_event(connected =>);
555             }
556             );
557             $self
558             }
559              
560             =head2 next_channel
561              
562             Returns the next available channel ready for L.
563             Note that whatever it reports will be completely wrong if you've
564             manually specified a channel anywhere, so don't do that.
565              
566             If channels have been closed on this connection, those IDs will be
567             reused in preference to handing out a new ID.
568              
569             =cut
570              
571             sub next_channel {
572             my $self = shift;
573             $self->{channel} //= 0;
574             return shift @{$self->{available_channel_id}} if @{$self->{available_channel_id} ||= [] };
575             return undef if $self->{channel} >= $self->channel_max;
576             ++$self->{channel}
577             }
578              
579             =head2 create_channel
580              
581             Returns a new ::Channel instance, populating the map of assigned channels in the
582             process. Takes a single parameter:
583              
584             =over 4
585              
586             =item * $id - the channel ID, can be undef to assign via L
587              
588             =back
589              
590             =cut
591              
592             sub create_channel {
593             my ($self, $id) = @_;
594             $id //= $self->next_channel;
595             die "No channel available" unless $id;
596              
597             my $f = $self->loop->new_future;
598             $self->{channel_map}{$id} = $f;
599             $self->add_child(
600             my $c = Net::Async::AMQP::Channel->new(
601             amqp => $self,
602             future => $f,
603             id => $id,
604             )
605             );
606             $self->{channel_by_id}{$id} = $c;
607             $self->debug_printf("Record channel %d as %s", $id, $c);
608             return $c;
609             }
610              
611             =head2 open_channel
612              
613             Opens a new channel.
614              
615             Returns the new L instance.
616              
617             =cut
618              
619             sub open_channel {
620             my $self = shift;
621             my %args = @_;
622             my $channel;
623             if($args{channel}) {
624             $channel = delete $args{channel};
625             extract_by { $channel == $_ } @{$self->{available_channel_id}} if exists $self->{available_channel_id};
626             } else {
627             $channel = $self->next_channel;
628             }
629             die "Channel " . $channel . " exists already: " . $self->{channel_map}{$channel} if exists $self->{channel_map}{$channel};
630             my $c = $self->create_channel($channel);
631             my $f = $c->future;
632              
633             my $frame = Net::AMQP::Frame::Method->new(
634             method_frame => Net::AMQP::Protocol::Channel::Open->new,
635             );
636             $frame->channel($channel);
637             $c->push_pending(
638             'Channel::OpenOk' => sub {
639             my ($c, $frame) = @_;
640             my $f = $self->{channel_map}{$frame->channel};
641             $f->done($c) unless $f->is_ready;
642             }
643             );
644             $self->send_frame($frame);
645             return $f;
646             }
647              
648             =head2 close
649              
650             Close the connection.
651              
652             Returns a L which will resolve with C<$self> when the connection is closed.
653              
654             =cut
655              
656             sub close {
657             my $self = shift;
658             my %args = @_;
659              
660             $self->heartbeat_send_timer->stop if $self->heartbeat_send_timer;
661              
662             my $f = $self->loop->new_future;
663              
664             # We might end up with a connection shutdown rather
665             # than a clean Connection::Close response, so
666             # we need to handle both possibilities
667             my @handler;
668             $self->bus->subscribe_to_event(
669             @handler = (
670             close => sub {
671             my ($ev, $reason) = @_;
672             splice @handler;
673             eval { $ev->unsubscribe; };
674             return unless $f;
675             $f->done($reason) unless $f->is_ready;
676             weaken $f;
677             }
678             )
679             );
680              
681             my $frame = Net::AMQP::Frame::Method->new(
682             method_frame => Net::AMQP::Protocol::Connection::Close->new(
683             reply_code => $args{code} // 320,
684             reply_text => $args{reason} // 'Request connection close',
685             ),
686             );
687             $self->push_pending(
688             'Connection::CloseOk' => [ $f, $self ],
689             );
690             $self->send_frame($frame);
691              
692             # ... and make sure we clean up after ourselves
693             $f->on_ready(sub {
694             $self->bus->unsubscribe_from_event(
695             @handler
696             );
697             weaken $f if $f;
698             });
699             }
700              
701             =head2 channel_closed
702              
703             =cut
704              
705             sub channel_closed {
706             my ($self, $id) = @_;
707             my $f = delete $self->{channel_map}{$id}
708             or die "Had a close indication for channel $id but this channel is unknown";
709             $f->cancel unless $f->is_ready;
710             $self->remove_child(delete $self->{channel_by_id}{$id});
711              
712             # Record this ID as available for the next time we need to open a new channel
713             push @{$self->{available_channel_id}}, $id;
714             $self
715             }
716              
717             sub channel_by_id { my $self = shift; $self->{channel_by_id}{+shift} }
718              
719             =head2 next_pending
720              
721             Retrieves the next pending handler for the given incoming frame type (see L),
722             and calls it.
723              
724             Takes the following parameters:
725              
726             =over 4
727              
728             =item * $type - the frame type, such as 'Basic::ConnectOk'
729              
730             =item * $frame - the frame itself
731              
732             =back
733              
734             Returns $self.
735              
736             =cut
737              
738             sub next_pending {
739             my ($self, $type, $frame) = @_;
740             $self->debug_printf("Check next pending for %s", $type);
741              
742             if($type eq 'Connection::Close') {
743             $self->on_closed($frame->method_frame->reply_text);
744             return $self;
745             }
746              
747             if(my $next = shift @{$self->{pending}{$type} || []}) {
748             # We have a registered handler for this frame type. This usually
749             # means that we've sent a frame and are awaiting a response.
750             if(ref($next) eq 'ARRAY') {
751             my ($f, @args) = @$next;
752             $f->done(@args) unless $f->is_ready;
753             } else {
754             $next->($self, $frame, @_);
755             }
756             } else {
757             # It's quite possible we'll see unsolicited frames back from
758             # the server: these will typically be errors, connection close,
759             # or consumer cancellation if the consumer_cancel_notify
760             # option is set (RabbitMQ). We don't expect many so report
761             # them when in debug mode.
762             $self->debug_printf("We had no pending handlers for %s, raising as event", $type);
763             $self->bus->invoke_event(
764             unexpected_frame => $type, $frame
765             );
766             }
767             $self
768             }
769              
770             =head1 METHODS - Accessors
771              
772             =head2 host
773              
774             The current host.
775              
776             =cut
777              
778             sub host { shift->{host} }
779              
780             =head2 vhost
781              
782             Virtual host.
783              
784             =cut
785              
786             sub vhost { shift->{vhost} }
787              
788             =head2 port
789              
790             Port number. Usually 5672.
791              
792             =cut
793              
794             sub port { shift->{port} }
795              
796             =head2 user
797              
798             MQ user.
799              
800             =cut
801              
802             sub user { shift->{user} }
803              
804             =head2 frame_max
805              
806             Maximum number of bytes allowed in any given frame. This is the
807             value negotiated with the remote server.
808              
809             =cut
810              
811             sub frame_max {
812             my $self = shift;
813             return $self->{frame_max} unless @_;
814              
815             $self->{frame_max} = shift;
816             $self
817             }
818              
819             =head2 channel_max
820              
821             Maximum number of channels. This is whatever we ended up with after initial negotiation.
822              
823             =cut
824              
825             sub channel_max {
826             my $self = shift;
827             return $self->{channel_max} ||= $self->{max_channels} || $self->MAX_CHANNELS unless @_;
828              
829             $self->{channel_max} = shift;
830             $self
831             }
832              
833             sub max_channels { shift->{max_channels} }
834              
835             =head2 last_frame_time
836              
837             Timestamp of the last frame we received from the remote. Used for handling heartbeats.
838              
839             =cut
840              
841             sub last_frame_time {
842             my $self = shift;
843             return $self->{last_frame_time} unless @_;
844              
845             $self->{last_frame_time} = shift;
846             $self->heartbeat_receive_timer->reset if $self->heartbeat_receive_timer;
847             $self
848             }
849              
850             =head2 stream
851              
852             Returns the current L for the AMQP connection.
853              
854             =cut
855              
856             sub stream { shift->{stream} }
857              
858             =head2 incoming_message
859              
860             L for the current incoming message (received in two or more parts:
861             the header then all body chunks).
862              
863             =cut
864              
865             sub incoming_message { shift->{incoming_message} }
866              
867             =head1 METHODS - Internal
868              
869             The following methods are intended for internal use. They are documented
870             for completeness but should not normally be needed outside this library.
871              
872             =cut
873              
874             =head2 heartbeat_interval
875              
876             Current maximum interval between frames.
877              
878             =cut
879              
880             sub heartbeat_interval { shift->{heartbeat_interval} //= HEARTBEAT_INTERVAL }
881              
882             =head2 missed_heartbeats_allowed
883              
884             How many times we allow the remote to miss the frame-sending deadline in a row
885             before we give up and close the connection. Defined by the protocol, should be
886             3x heartbeats.
887              
888             =cut
889              
890             sub missed_heartbeats_allowed { 3 }
891              
892             =head2 apply_heartbeat_timer
893              
894             Enable both heartbeat timers.
895              
896             =cut
897              
898             sub apply_heartbeat_timer {
899             my $self = shift;
900             { # On expiry, will trigger a heartbeat send from us to the server
901             my $timer = IO::Async::Timer::Countdown->new(
902             delay => $self->heartbeat_interval,
903             on_expire => $self->curry::weak::send_heartbeat,
904             );
905             $self->add_child($timer);
906             $timer->start;
907             Scalar::Util::weaken($self->{heartbeat_send_timer} = $timer);
908             }
909             { # This timer indicates no traffic from the remote for 3*heartbeat
910             my $timer = IO::Async::Timer::Countdown->new(
911             delay => $self->missed_heartbeats_allowed * $self->heartbeat_interval,
912             on_expire => $self->curry::weak::handle_heartbeat_failure,
913             );
914             $self->add_child($timer);
915             $timer->start;
916             Scalar::Util::weaken($self->{heartbeat_receive_timer} = $timer);
917             }
918             $self
919             }
920              
921             =head2 reset_heartbeat
922              
923             Resets our side of the heartbeat timer.
924              
925             This is used to ensure we send data at least once every L
926             seconds.
927              
928             =cut
929              
930             sub reset_heartbeat {
931             my $self = shift;
932             return unless my $timer = $self->heartbeat_send_timer;
933              
934             $timer->reset;
935             }
936              
937              
938             =head2 heartbeat_receive_timer
939              
940             Timer for tracking frames we've received.
941              
942             =cut
943              
944             sub heartbeat_receive_timer { shift->{heartbeat_receive_timer} }
945              
946             =head2 heartbeat_send_timer
947              
948             Timer for tracking when we're due to send out something.
949              
950             =cut
951              
952             sub heartbeat_send_timer { shift->{heartbeat_send_timer} }
953              
954             =head2 handle_heartbeat_failure
955              
956             Called when heartbeats are enabled and we've had no response from the server for 3 heartbeat
957             intervals (see L). We'd expect some frame from the remote - even
958             if just a heartbeat frame - at least once every heartbeat interval so if this triggers then
959             we're likely dealing with a dead or heavily loaded server.
960              
961             This will invoke the L then close the connection.
962              
963             =cut
964              
965             sub handle_heartbeat_failure {
966             my $self = shift;
967             $self->debug_printf("Heartbeat timeout: no data received from server since %s, closing connection", $self->last_frame_time);
968              
969             $self->bus->invoke_event(
970             heartbeat_failure => $self->last_frame_time
971             );
972             $self->close;
973             }
974              
975             =head2 send_heartbeat
976              
977             Sends the heartbeat frame.
978              
979             =cut
980              
981             sub send_heartbeat {
982             my $self = shift;
983             $self->debug_printf("Sending heartbeat frame");
984              
985             # Heartbeat messages apply to the connection rather than
986             # individual channels, so we use channel 0 to represent this
987             $self->send_frame(
988             Net::AMQP::Frame::Heartbeat->new,
989             channel => 0,
990             );
991              
992             # Ensure heartbeat timer is active for next time
993             if(my $timer = $self->heartbeat_send_timer) {
994             $timer->reset;
995             $timer->start;
996             }
997             }
998              
999             =head2 push_pending
1000              
1001             Adds the given handler(s) to the pending handler list for the given type(s).
1002              
1003             Takes one or more of the following parameter pairs:
1004              
1005             =over 4
1006              
1007             =item * $type - the frame type, see L
1008              
1009             =item * $code - the coderef to call, will be invoked once as follows when a matching frame is received:
1010              
1011             $code->($self, $frame, @_)
1012              
1013             =back
1014              
1015             Returns C< $self >.
1016              
1017             =cut
1018              
1019             sub push_pending {
1020             my $self = shift;
1021             while(@_) {
1022             my ($type, $code) = splice @_, 0, 2;
1023             push @{$self->{pending}{$type}}, $code;
1024             }
1025             return $self;
1026             }
1027              
1028             =head2 remove_pending
1029              
1030             Removes a coderef from the pending event handler.
1031              
1032             Returns C< $self >.
1033              
1034             =cut
1035              
1036             sub remove_pending {
1037             my $self = shift;
1038             while(@_) {
1039             my ($type, $code) = splice @_, 0, 2;
1040             # This is the same as extract_by { $_ eq $code } @{$self->{pending}{$type}};,
1041             # but since we'll be calling it a lot might as well do it inline:
1042             splice
1043             @{$self->{pending}{$type}},
1044             $_,
1045             1 for grep {
1046             $self->{pending}{$type}[$_] eq $code
1047             } reverse 0..$#{$self->{pending}{$type}};
1048             }
1049             return $self;
1050             }
1051              
1052             =head2 write
1053              
1054             Writes data to the server.
1055              
1056             Returns a L which will resolve to an empty list when
1057             done.
1058              
1059             =cut
1060              
1061             sub write {
1062             my $self = shift;
1063             $self->stream->write(@_)
1064             }
1065              
1066             =head2 process_frame
1067              
1068             Process a single incoming frame.
1069              
1070             Takes the following parameters:
1071              
1072             =over 4
1073              
1074             =item * $frame - the L instance
1075              
1076             =back
1077              
1078             Returns $self.
1079              
1080             =cut
1081              
1082             sub process_frame {
1083             my ($self, $frame) = @_;
1084             $self->debug_printf("Received %s", amqp_frame_info($frame));
1085              
1086             my $frame_type = amqp_frame_type($frame);
1087              
1088             if($frame_type eq 'Heartbeat') {
1089             # Ignore these completely. Since we have the last frame update at the data-read
1090             # level, there's nothing for us to do here.
1091             $self->debug_printf("Heartbeat received");
1092              
1093             # A peer that receives an invalid heartbeat frame MUST raise a connection
1094             # exception with reply code 501 (frame error)
1095             $self->close(
1096             code => 501,
1097             reason => 'Frame error - heartbeat should have channel 0'
1098             ) if $frame->channel;
1099              
1100             return $self;
1101             } elsif(my $ch = $self->channel_by_id($frame->channel)) {
1102             $self->debug_printf("Processing frame %s on channel %d", $frame_type, $ch);
1103             return $self if $ch->next_pending($frame);
1104             }
1105              
1106             $self->debug_printf("Processing connection frame %s", $frame_type);
1107              
1108             $self->next_pending($frame_type, $frame);
1109              
1110             return $self;
1111             }
1112              
1113             =head2 split_payload
1114              
1115             Splits a message into separate frames.
1116              
1117             Takes the $payload as a scalar containing byte data, and the following parameters:
1118              
1119             =over 4
1120              
1121             =item * exchange - where we're sending the message
1122              
1123             =item * routing_key - other part of message destination
1124              
1125             =back
1126              
1127             Additionally, the following headers can be passed:
1128              
1129             =over 4
1130              
1131             =item * content_type
1132              
1133             =item * content_encoding
1134              
1135             =item * headers
1136              
1137             =item * delivery_mode
1138              
1139             =item * priority
1140              
1141             =item * correlation_id
1142              
1143             =item * reply_to
1144              
1145             =item * expiration
1146              
1147             =item * message_id
1148              
1149             =item * timestamp
1150              
1151             =item * type
1152              
1153             =item * user_id
1154              
1155             =item * app_id
1156              
1157             =item * cluster_id
1158              
1159             =back
1160              
1161             Returns list of frames suitable for passing to L.
1162              
1163             =cut
1164              
1165             sub split_payload {
1166             my $self = shift;
1167             my $payload = shift;
1168             my %opts = @_;
1169              
1170             # Get the original content length first
1171             my $payload_size = length $payload;
1172              
1173             my @body_frames;
1174             while (length $payload) {
1175             my $chunk = substr $payload, 0, $self->frame_max - PAYLOAD_HEADER_LENGTH, '';
1176             push @body_frames, Net::AMQP::Frame::Body->new(
1177             payload => $chunk
1178             );
1179             }
1180              
1181             return
1182             Net::AMQP::Protocol::Basic::Publish->new(
1183             map {; $_ => $opts{$_} } grep defined($opts{$_}), qw(ticket exchange routing_key mandatory immediate)
1184             ),
1185             Net::AMQP::Frame::Header->new(
1186             weight => $opts{weight} || 0,
1187             body_size => $payload_size,
1188             header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
1189             map {; $_ => $opts{$_} } grep defined($opts{$_}), qw(
1190             content_type
1191             content_encoding
1192             headers
1193             delivery_mode
1194             priority
1195             correlation_id
1196             reply_to
1197             expiration
1198             message_id
1199             timestamp
1200             type
1201             user_id
1202             app_id
1203             cluster_id
1204             )
1205             ),
1206             ),
1207             @body_frames;
1208             }
1209              
1210             =head2 send_frame
1211              
1212             Send a single frame.
1213              
1214             Takes the $frame instance followed by these optional named parameters:
1215              
1216             =over 4
1217              
1218             =item * channel - which channel we should send on
1219              
1220             =back
1221              
1222             Returns a L which will resolve to an empty list
1223             when the frame has been written (this does not guarantee that the server has received it).
1224              
1225             =cut
1226              
1227             sub send_frame {
1228             my $self = shift;
1229             my $frame = shift;
1230             my %args = @_;
1231              
1232             # Apply defaults and wrap as required
1233             $frame = $frame->frame_wrap if $frame->isa("Net::AMQP::Protocol::Base");
1234             die "Frame has channel ID " . $frame->channel . " but we wanted " . $args{channel}
1235             if defined $frame->channel && defined $args{channel} && $frame->channel != $args{channel};
1236              
1237             $frame->channel($args{channel} // 0) unless defined $frame->channel;
1238              
1239             $self->debug_printf("Sending %s", amqp_frame_info($frame));
1240              
1241             # Get bytes to send across our transport
1242             my $data = $frame->to_raw_frame;
1243              
1244             # warn "Sending data: " . Dumper($frame) . "\n";
1245             $self->write(
1246             $data,
1247             )->on_done($self->curry::reset_heartbeat)
1248             }
1249              
1250             =head2 header_bytes
1251              
1252             Byte string representing the header bytes we should send on initial TCP connect.
1253             Net::AMQP uses AMQP\x01\x01\x09\x01, which does not appear to comply with AMQP 0.9.1
1254             section 4.2.2.
1255              
1256             =cut
1257              
1258             sub header_bytes { "AMQP\x00\x00\x09\x01" }
1259              
1260             sub _add_to_loop {
1261             my ($self, $loop) = @_;
1262             $self->debug_printf("Added %s to loop", $self);
1263             }
1264              
1265             =head1 future
1266              
1267             Returns a new L instance.
1268              
1269             Supports optional named parameters for setting label etc.
1270              
1271             =cut
1272              
1273             sub future {
1274             my $self = shift;
1275             my $f = $self->loop->new_future;
1276             while(my ($k, $v) = splice @_, 0, 2) {
1277             $f->can($k) ? $f->$k($v) : die "Unable to call method $k on $f";
1278             }
1279             $f
1280             }
1281              
1282             1;
1283              
1284             __END__