File Coverage

blib/lib/POE/Component/Client/AMQP.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package POE::Component::Client::AMQP;
2              
3             =head1 NAME
4              
5             POE::Component::Client::AMQP - Asynchronous AMQP client implementation in POE
6              
7             =head1 SYNOPSIS
8              
9             use POE::Component::Client::AMQP;
10              
11             Net::AMQP::Protocol->load_xml_spec('amqp0-8.xml');
12              
13             my $amq = POE::Component::Client::AMQP->create(
14             RemoteAddress => 'mq.domain.tld',
15             );
16              
17             $amq->channel(1)->queue('frank')->subscribe(sub {
18             my ($payload, $meta) = @_;
19              
20             my $reply_to = $meta->{header_frame}->reply_to;
21              
22             $amq->channel(1)->queue($reply_to)->publish("Message received");
23             });
24              
25             $amq->run();
26              
27             =head1 DESCRIPTION
28              
29             This module implements the Advanced Message Queue Protocol (AMQP) TCP/IP client. It's goal is to provide users with a quick and easy way of using AMQP while at the same time exposing the advanced functionality of the protocol if needed.
30              
31             The (de)serialization and representation logic is handled by L, which needs to be setup (via load_xml_spec()) prior to this client software running. Please see the docs there for further information on this.
32              
33             =cut
34              
35 1     1   40105 use strict;
  1         3  
  1         37  
36 1     1   5 use warnings;
  1         2  
  1         36  
37 1     1   895 use Params::Validate qw(validate validate_with);
  1         9941  
  1         71  
38 1     1   2253 use Net::AMQP;
  0            
  0            
39             use Net::AMQP::Common qw(:all);
40             use Carp;
41             use base qw(Exporter Class::Accessor);
42             __PACKAGE__->mk_accessors(qw(Logger is_stopped is_started is_stopping frame_max));
43              
44             our $VERSION = 0.03;
45              
46             use constant {
47             AMQP_ACK => '__amqp_ack__',
48             AMQP_REJECT => '__amqp_reject__',
49             };
50              
51             my @_constants;
52             our (@EXPORT_OK, %EXPORT_TAGS);
53             BEGIN {
54             @_constants = qw(AMQP_ACK AMQP_REJECT);
55             @EXPORT_OK = (@_constants);
56             %EXPORT_TAGS = ('constants' => [@_constants]);
57             };
58              
59             # Use libraries that require my constants after defining them
60              
61             use POE qw(
62             Filter::Stream
63             Component::Client::AMQP::TCP
64             Component::Client::AMQP::Channel
65             Component::Client::AMQP::Queue
66             );
67              
68             =head1 USAGE
69              
70             =head2 create
71              
72             my $amq = POE::Component::Client::AMQP->create(
73             RemoteAddress => 'mq.domain.tld',
74             );
75              
76             Create a new AMQP client. Arguments to this method:
77              
78             =over 4
79              
80             =item I (default: 127.0.0.1)
81              
82             Connect to this host
83              
84             =item I (default: 5672)
85              
86             =item I (default: guest)
87              
88             =item I (default: guest)
89              
90             =item I (default: /)
91              
92             =item I (default: simple screen logger)
93              
94             Provide an object which implements 'debug', 'info' and 'error' logging methods (such as L).
95              
96             =item I
97              
98             This module provides extensive debugging options. These are specified as a hash as follows:
99              
100             =over 4
101              
102             =item I (boolean)
103              
104             Display decisions the code is making
105              
106             =item I (boolean)
107              
108             =item I (boolean)
109              
110             Use the I code to display frames that come in from or out to the server.
111              
112             =item I (coderef)
113              
114             A coderef which, given a L object, will return a string representation of it, prefixed with "\n".
115              
116             =item I (boolean)
117              
118             =item I (boolean)
119              
120             Use the I code to display raw data that comes in from or out to the server.
121              
122             =item I (coderef)
123              
124             A coderef which, given a raw string, will return a byte representation of it, prefixed with "\n".
125              
126             =back
127              
128             =item I (default: 0)
129              
130             If set, will send a L frame every Keepalive seconds after the last activity on the connection. This is a mechanism to keep a long-open TCP session alive.
131              
132             =item I (default: amqp_client)
133              
134             The POE session alias of the main client session
135              
136             =item I (default: tcp_client)
137              
138             The POE session alias of the TCP client
139              
140             =item I (default: {})
141              
142             Provide callbacks. At the moment, 'Startup' and 'FrameSent' are the only recognized callback.
143              
144             FrameSent will be called with $self and the Net::AMQP::Frame being sent.
145              
146             =item I
147              
148             Set to '1' to avoid creating POE::Sessions (mainly useful in t/ scripts)
149              
150             =back
151              
152             Returns a class object.
153              
154             =cut
155              
156             sub create {
157             my $class = shift;
158              
159             my %self = validate_with(
160             params => \@_,
161             spec => {
162             RemoteAddress => { default => '127.0.0.1' },
163             RemotePort => 0,
164             Username => { default => 'guest' },
165             Password => { default => 'guest' },
166             VirtualHost => { default => '/' },
167              
168             Logger => 0,
169             Debug => { default => {} },
170              
171             Alias => { default => 'amqp_client' },
172             AliasTCP => { default => 'tcp_client' },
173             Callbacks => { default => {} },
174             SSL => { default => 0 },
175             Keepalive => { default => 0 },
176             Reconnect => { default => 0 },
177              
178             channels => { default => {} },
179             is_started => { default => 0 },
180             is_testing => { default => 0 },
181             is_stopped => { default => 0 },
182             frame_max => { default => 0 },
183             },
184             allow_extra => 1,
185             );
186              
187             $self{RemotePort} ||= $self{SSL} ? 5671 : 5672;
188              
189             $self{Logger} ||= POE::Component::Client::AMQP::FakeLogger->new(
190             debug => keys(%{ $self{Debug} }) ? 1 : 0,
191             );
192              
193             my $self = bless \%self, $class;
194              
195             my %Debug = validate_with(
196             params => $self->{Debug},
197             spec => {
198             raw_input => 0,
199             raw_output => 0,
200             raw_dumper => { default => sub {
201             my $output = shift;
202             return "\nraw [".length($output)."]: ".show_ascii($output);
203             } },
204              
205             frame_input => 0,
206             frame_output => 0,
207             frame_dumper => { default => sub {} },
208              
209             logic => 0,
210             },
211             );
212             $self->{Debug} = \%Debug;
213              
214             POE::Session->create(
215             object_states => [
216             $self => [qw(
217             _start
218             server_send
219             server_connected
220             server_disconnect
221             shutdown
222             keepalive
223             )],
224             ],
225             ) unless $self->{is_testing};
226              
227             # If the user passed an arrayref as the RemoteAddress, pick one
228             # at random to connect to.
229             if (ref $self->{RemoteAddress}) {
230             # Shuffle the RemoteAddress array (thanks http://community.livejournal.com/perl/101830.html)
231             my $array = $self->{RemoteAddress};
232             for (my $i = @$array; --$i; ) {
233             my $j = int rand ($i+1);
234             next if $i == $j;
235             @$array[$i,$j] = @$array[$j,$i];
236             }
237              
238             # Take the first shuffled address and move it to the back
239             $self->{current_RemoteAddress} = shift @{ $self->{RemoteAddress} };
240             push @{ $self->{RemoteAddress} }, $self->{current_RemoteAddress};
241             }
242             else {
243             $self->{current_RemoteAddress} = $self->{RemoteAddress};
244             }
245              
246             POE::Component::Client::AMQP::TCP->new(
247             Alias => $self->{AliasTCP},
248             RemoteAddress => $self->{current_RemoteAddress},
249             RemotePort => $self->{RemotePort},
250             Connected => sub { $self->tcp_connected(@_) },
251             Disconnected => sub { $self->tcp_disconnected(@_) },
252             ConnectError => sub { $self->tcp_connect_error(@_) },
253             ConnectTimeout => 20,
254             ServerInput => sub { $self->tcp_server_input(@_) },
255             ServerFlushed => sub { $self->tcp_server_flush(@_) },
256             ServerError => sub { $self->tcp_server_error(@_) },
257             Filter => 'POE::Filter::Stream',
258             SSL => $self->{SSL},
259             InlineStates => {
260             reconnect_delayed => sub { $self->tcp_reconnect_delayed(@_) },
261             },
262             ) unless $self->{is_testing};
263              
264             return $self;
265             }
266              
267             ## Public Class Methods ###
268              
269             =head1 CLASS METHODS
270              
271             =head2 do_when_startup (...)
272              
273             =over 4
274              
275             Pass a subref that should be executed after the client has connected and authenticated with the remote AMQP server. If the client is already connected and authenticated, the subref will be called immediately. Think: deferred.
276              
277             =back
278              
279             =cut
280              
281             sub do_when_startup {
282             my ($self, $subref) = @_;
283              
284             if ($self->{is_started}) {
285             $subref->();
286             }
287             else {
288             push @{ $self->{Callbacks}{Startup} }, $subref;
289             }
290             }
291              
292             =head2 channel ($id)
293              
294             =over 4
295              
296             Call with an optional argument $id (1 - 65536). Returns a L object which can be used immediately.
297              
298             =back
299              
300             =cut
301              
302             sub channel {
303             my ($self, $id, $opts) = @_;
304             $opts ||= {};
305              
306             if (defined $id && $self->{channels}{$id}) {
307             return $self->{channels}{$id};
308             }
309              
310             my $channel = POE::Component::Client::AMQP::Channel->create(
311             id => $id,
312             server => $self,
313             %$opts,
314             );
315              
316             # We don't need to record the channel, as the Channel->create() did so already in our 'channels' hash
317              
318             return $channel;
319             }
320              
321             =head2 run ()
322              
323             =over 4
324              
325             Shortcut to calling $poe_kernel->run
326              
327             =back
328              
329             =cut
330              
331             sub run {
332             $poe_kernel->run();
333             }
334              
335             =head2 stop ()
336              
337             =over 4
338              
339             Shortcut to calling the POE state 'disconnect'
340              
341             =back
342              
343             =cut
344              
345             sub stop {
346             my $self = shift;
347             $poe_kernel->call($self->{Alias}, 'server_disconnect');
348             }
349              
350             =head2 compose_basic_publish ($payload, %options)
351              
352             =over 4
353              
354             A helper method to generate the frames necessary for a basic publish. Returns a L, L (wrapping a L frame) followed by zero or more L frames. Since the arguments for each one of these frames are unique, the %options hash provides options for all of the frames.
355              
356             The following options are supported, all of which are optional, some having sane defaults:
357              
358             =over 4
359              
360             =item I
361              
362             =over 4
363              
364             =item I (default: 0)
365              
366             =back
367              
368             =item I
369              
370             =over 4
371              
372             =item I (default: 0)
373              
374             =item I
375              
376             =item I
377              
378             =item I (default: 1)
379              
380             =item I
381              
382             =back
383              
384             =item I
385              
386             =over 4
387              
388             =item I
389              
390             =item I
391              
392             =item I (default: {})
393              
394             =item I (default: 1)
395              
396             =item I (default: 1)
397              
398             =item I
399              
400             =item I
401              
402             =item I
403              
404             =item I
405              
406             =item I
407              
408             =item I
409              
410             =item I
411              
412             =item I
413              
414             =item I
415              
416             =back
417              
418             =back
419              
420             =back
421              
422             =cut
423              
424             sub compose_basic_publish {
425             my ($self, $payload) = (shift, shift);
426              
427             my %opts = validate(@_, {
428             # Header options
429             weight => { default => 0 },
430              
431             # Method options
432             ticket => { default => 0 },
433             exchange => 0,
434             routing_key => 0,
435             mandatory => { default => 1 },
436             immediate => 0,
437              
438             # Content options
439             content_type => 0,
440             content_encoding => 0,
441             headers => { default => {} },
442             delivery_mode => { default => 1 }, # non-persistent
443             priority => { default => 1 },
444             correlation_id => 0,
445             reply_to => 0,
446             expiration => 0,
447             message_id => 0,
448             timestamp => 0,
449             type => 0,
450             user_id => 0,
451             app_id => 0,
452             cluster_id => 0,
453             });
454              
455             my $payload_size = length $payload;
456             my @body_frames;
457             while (length $payload) {
458             my $partial = substr $payload, 0, $self->frame_max - 8, '';
459             push @body_frames, Net::AMQP::Frame::Body->new(payload => $partial);
460             }
461              
462             return (
463             Net::AMQP::Protocol::Basic::Publish->new(
464             map { $_ => $opts{$_} }
465             grep { defined $opts{$_} }
466             qw(ticket exchange routing_key mandatory immediate)
467             ),
468             Net::AMQP::Frame::Header->new(
469             weight => $opts{weight},
470             body_size => $payload_size,
471             header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
472             map { $_ => $opts{$_} }
473             grep { defined $opts{$_} }
474             qw(content_type content_encoding headers delivery_mode priority correlation_id
475             reply_to expiration message_id timestamp type user_id app_id cluster_id)
476             ),
477             ),
478             @body_frames,
479             );
480             }
481              
482             =head1 POE STATES
483              
484             The following are states you can post to to interact with the client. Use the alias defined in the C call above.
485              
486             =cut
487              
488             sub _start {
489             my ($self, $kernel) = @_[OBJECT, KERNEL];
490              
491             $kernel->alias_set($self->{Alias});
492             }
493              
494             =head2 server_disconnect
495              
496             =over 4
497              
498             Send a Connection.Close request
499              
500             =back
501              
502             =cut
503              
504             sub server_disconnect {
505             my ($self, $kernel) = @_[OBJECT, KERNEL];
506              
507             $self->{is_stopping} = 1;
508              
509             # Don't defer my disconnect request just because we're waiting for the response to a synchronous method
510             $self->{wait_synchronous} = {};
511              
512             $kernel->yield(server_send =>
513             Net::AMQP::Frame::Method->new(
514             synchronous_callback => sub {
515             $self->{is_stopped} = 1;
516             $self->{is_started} = 0;
517             },
518             method_frame => Net::AMQP::Protocol::Connection::Close->new(),
519             )
520             );
521             }
522              
523             sub server_connected {
524             my ($self, $kernel) = @_[OBJECT, KERNEL];
525              
526             $self->{Logger}->info("Connected to the AMQP server ".($self->{SSL} ? '(over SSL) ' : '')."and ready to act");
527              
528             $self->do_callback('Startup');
529              
530             $self->{is_started} = 1;
531              
532             if ($self->{Keepalive}) {
533             $kernel->delay(keepalive => $self->{Keepalive});
534             }
535             }
536              
537             =head2 server_send (@output)
538              
539             =over 4
540              
541             Pass one or more L objects. For short hand, you may pass L objects, which will be automatically wrapped in the appropriate frame type, with channel 0. These frames will be written to the server. In the case of L objects which are calling a synchronous method, the client will handle them one at a time, waiting until a synchronous method returns properly before sending further synchronous frames. This happens automatically.
542              
543             =back
544              
545             =cut
546              
547             sub server_send {
548             my ($self, $kernel, @output) = @_[OBJECT, KERNEL, ARG0 .. $#_];
549              
550             if ($self->{is_stopped}) {
551             $self->{Logger}->error("Server send called while stopped with ".int(@output)." messages");
552             push @{ $self->{pending_server_send} }, @output;
553             # FIXME: nothing is currently done with this pending server send queue; users can choose
554             # to resend them in their Reconnected callback
555             return;
556             }
557              
558             while (my $output = shift @output) {
559             if (! defined $output || ! ref $output) {
560             $self->{Logger}->error("Server send called with invalid output (".(defined $output ? $output : 'undef').")");
561             next;
562             }
563              
564             if ($output->isa("Net::AMQP::Protocol::Base")) {
565             $output = $output->frame_wrap;
566             }
567              
568             if (! $output->isa("Net::AMQP::Frame")) {
569             $self->{Logger}->error("Server send called with invalid output (".ref($output).")");
570             next;
571             }
572              
573             # Set default channel
574             $output->channel(0) unless defined $output->channel;
575              
576             if ($output->isa('Net::AMQP::Frame::Method') && $output->method_frame->method_spec->{synchronous}) {
577             # If we're calling a synchronous method, then the server won't send any other
578             # synchronous replies of particular type(s) until this message is replied to.
579             # Wait for replies of these type(s) and don't send other messages until they're
580             # cleared.
581              
582             my $output_class = ref($output->method_frame);
583              
584             $self->{wait_synchronous}{ $output->channel } ||= {};
585             my $wait_synchronous = $self->{wait_synchronous}{ $output->channel };
586              
587             # FIXME: It appears that RabbitMQ won't let us do two disimilar synchronous requests at once
588             if (my @waiting_classes = keys %$wait_synchronous) {
589             $self->{Logger}->debug("Class $waiting_classes[0] is already waiting; do nothing else until it's complete; defering")
590             if $self->{Debug}{logic};
591             push @{ $wait_synchronous->{ $waiting_classes[0] }{process_after} }, [ $output, @output ];
592             return;
593             }
594              
595             # if ($self->{wait_synchronous}{$output_class}) {
596             # # There are already other things waiting; enqueue this output
597             # $self->{Logger}->debug("Class $output_class is already synchronously waiting; defering this and subsequent output")
598             # if $self->{Debug}{logic};
599             # push @{ $self->{wait_synchronous}{$output_class}{process_after} }, [ $output, @output ];
600             # return;
601             # }
602              
603             my $responses = $output_class->method_spec->{responses};
604              
605             if (keys %$responses) {
606             $self->{Logger}->debug("Setting up synchronous callback for $output_class")
607             if $self->{Debug}{logic};
608             $wait_synchronous->{$output_class} = {
609             request => $output,
610             responses => $responses,
611             process_after => [],
612             };
613             }
614             }
615              
616             my $raw_output = $output->to_raw_frame();
617             $self->{Logger}->debug(
618             'chan(' . $output->channel . ") >>> ".$output->type_string
619             . ($self->{Debug}{frame_output} ? $self->{Debug}{frame_dumper}($output) : '')
620             . ($self->{Debug}{raw_output} ? $self->{Debug}{raw_dumper}($raw_output) : '')
621             );
622              
623             $self->{HeapTCP}{server}->put($raw_output);
624             $self->{last_server_put} = time;
625             $self->do_callback('FrameSent', $output);
626             }
627             }
628              
629             =head2 shutdown ()
630              
631             =over 4
632              
633             If you need to stop things immediately, call shutdown(). This is not graceful.
634              
635             =back
636              
637             =cut
638              
639             sub shutdown {
640             my ($self, $kernel) = @_[OBJECT, KERNEL];
641              
642             $self->{is_stopped} = 1;
643              
644             # Clear any alarms that may be set ('keepalive', for instance)
645             $poe_kernel->alarm_remove_all();
646              
647             $kernel->call($self->{AliasTCP}, 'shutdown');
648             }
649              
650             =head2 keepalive
651              
652             Sends a Heartbeat frame at a regular interval to keep the TCP session from timing out.
653              
654             =cut
655              
656             sub keepalive {
657             my ($self, $kernel) = @_[OBJECT, KERNEL];
658              
659             return unless $self->{Keepalive} > 0;
660              
661             my $idle_time = time - $self->{last_server_put};
662             my $delay = $self->{Keepalive};
663             if ($idle_time >= $self->{Keepalive}) {
664             $kernel->yield(server_send =>
665             Net::AMQP::Frame::Heartbeat->new()
666             );
667             }
668             else {
669             $delay -= $idle_time;
670             }
671              
672             $kernel->delay(keepalive => $delay);
673             }
674              
675             ## Private Class Methods ###
676              
677             sub tcp_connected {
678             my $self = shift;
679             my ($kernel, $heap) = @_[KERNEL, HEAP];
680              
681             $self->{Logger}->debug("Connected to remote host");
682              
683             #$self->{Logger}->debug("Sending 4.2.2 Protocol Header");
684             $heap->{server}->put( Net::AMQP::Protocol->header );
685              
686             # If 'reconnect_attempt' has a value, we have reconnected
687             if ($self->{reconnect_attempt}) {
688             $self->{reconnect_attempt} = 0;
689             $self->do_callback('Reconnected');
690             }
691              
692             $self->{HeapTCP} = $heap;
693             $self->{is_stopped} = 0;
694             }
695              
696             sub tcp_server_flush {
697             my $self = shift;
698             my ($kernel, $heap) = @_[KERNEL, HEAP];
699              
700             #$self->{Logger}->debug("Server flush");
701             }
702              
703             sub tcp_server_input {
704             my $self = shift;
705             my ($kernel, $heap, $input) = @_[KERNEL, HEAP, ARG0];
706              
707             # FIXME: Not every record is complete; it may be split at 16384 bytes
708             # FIXME: Checking last octet is not best; find better way!
709             my $frame_end_octet = unpack 'C', substr $input, -1, 1;
710             if ($frame_end_octet != 206) {
711             $self->{Logger}->debug("Server input length ".length($input)." without frame end octet");
712             $self->{buffered_input} = '' unless defined $self->{buffered_input};
713             $self->{buffered_input} .= $input;
714             return;
715             }
716             elsif (defined $self->{buffered_input}) {
717             $input = delete($self->{buffered_input}) . $input;
718             }
719              
720             $self->{Logger}->debug("Server said: " . $self->{Debug}{raw_dumper}($input))
721             if $self->{Debug}{raw_input};
722              
723             my @frames = Net::AMQP->parse_raw_frames(\$input);
724             FRAMES:
725             foreach my $frame (@frames) {
726             $self->{Logger}->debug(
727             'chan(' . $frame->channel . ") <<< ".$frame->type_string
728             . ($self->{Debug}{frame_input} ? $self->{Debug}{frame_dumper}($frame) : '')
729             );
730              
731             my $handled = 0;
732             if ($frame->channel != 0) {
733             my $channel = $self->{channels}{ $frame->channel };
734             if (! $channel) {
735             $self->{Logger}->error("Received frame on channel ".$frame->channel." which we didn't request the creation of");
736             next FRAMES;
737             }
738             $kernel->post($channel->{Alias}, server_input => $frame);
739             $handled++;
740             }
741              
742             if ($frame->isa('Net::AMQP::Frame::Method')) {
743             my $method_frame = $frame->method_frame;
744              
745             # Check the 'wait_synchronous' hash to see if this response is a synchronous reply
746             my $method_frame_class = ref $method_frame;
747             if ($method_frame_class->method_spec->{synchronous}) {
748             $self->{Logger}->debug("Checking 'wait_synchronous' hash against $method_frame_class") if $self->{Debug}{logic};
749              
750             my $matching_output_class;
751             while (my ($output_class, $details) = each %{ $self->{wait_synchronous}{ $frame->channel } }) {
752             next unless $details->{responses}{ $method_frame_class };
753             $matching_output_class = $output_class;
754             last;
755             }
756              
757             if ($matching_output_class) {
758             $self->{Logger}->debug("Response type '$method_frame_class' found from waiting request '$matching_output_class'")
759             if $self->{Debug}{logic};
760              
761             my $details = delete $self->{wait_synchronous}{ $frame->channel }{$matching_output_class};
762              
763             # Call the asynch callback if there is one
764             if (my $callback = delete $details->{request}{synchronous_callback}) {
765             $self->{Logger}->debug("Calling $matching_output_class callback") if $self->{Debug}{logic};
766             $callback->($frame);
767             }
768              
769             # Dequeue anything that was blocked by this
770             foreach my $output (@{ $details->{process_after} }) {
771             $self->{Logger}->debug("Dequeueing items that blocked due to '$method_frame_class'") if $self->{Debug}{logic};
772             $kernel->post($self->{Alias}, server_send => @$output);
773             }
774              
775             # Consider this frame handled
776             $handled++;
777             }
778             }
779              
780             # Act upon connection-level methods
781             if (! $handled && $frame->channel == 0) {
782             if ($method_frame->isa('Net::AMQP::Protocol::Connection::Start')) {
783             $kernel->post($self->{Alias}, server_send =>
784             Net::AMQP::Protocol::Connection::StartOk->new(
785             client_properties => {
786             platform => 'Perl/POE',
787             product => __PACKAGE__,
788             information => 'http://code.xmission.com/',
789             version => $VERSION,
790             },
791             mechanism => 'AMQPLAIN', # TODO - ensure this is in $method_frame{mechanisms}
792             response => { LOGIN => $self->{Username}, PASSWORD => $self->{Password} },
793             locale => 'en_US',
794             ),
795             );
796             $handled++;
797             }
798             elsif ($method_frame->isa('Net::AMQP::Protocol::Connection::Tune')) {
799             $self->{frame_max} = $method_frame->frame_max;
800             $kernel->post($self->{Alias}, server_send =>
801             Net::AMQP::Protocol::Connection::TuneOk->new(
802             channel_max => 0,
803             frame_max => $method_frame->frame_max,
804             heartbeat => 0,
805             ),
806             Net::AMQP::Frame::Method->new(
807             synchronous_callback => sub {
808             $kernel->post($self->{Alias}, 'server_connected');
809             },
810             method_frame => Net::AMQP::Protocol::Connection::Open->new(
811             virtual_host => $self->{VirtualHost},
812             capabilities => '',
813             insist => 1,
814             ),
815             ),
816             );
817             $handled++;
818             }
819             }
820             }
821              
822             if (! $handled) {
823             $self->{Logger}->error("Unhandled input frame ".ref($frame));
824             }
825             }
826             }
827              
828             sub tcp_server_error {
829             my $self = shift;
830             my ($kernel, $heap, $name, $num, $string) = @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
831              
832             # Normal disconnection
833             if ($name eq 'read' && $num == 0 && $self->{is_stopping}) {
834             return;
835             }
836              
837             $self->{Logger}->error("TCP error: $name (num: $num, string: $string)");
838             }
839              
840             sub tcp_connect_error {
841             my $self = shift;
842             my ($kernel, $heap, $name, $num, $string) = @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
843              
844             $self->{Logger}->error("TCP connect error: $name (num: $num, string: $string)");
845             $kernel->post($self->{AliasTCP}, 'reconnect_delayed') if $self->{Reconnect};
846             }
847              
848             sub tcp_disconnected {
849             my $self = shift;
850             my ($kernel, $heap) = @_[KERNEL, HEAP];
851              
852             $self->{Logger}->error("TCP connection is disconnected");
853              
854             # The flag 'is_stopping' will be 1 if server_disconnect was explicitly called
855             return if $self->{is_stopping};
856              
857             # We are here due to an error; we should record that we're stopped, and try and reconnect
858             $self->{is_stopped} = 1;
859             $self->{is_started} = 0;
860             $self->{wait_synchronous} = {};
861              
862             if ($self->{Reconnect}) {
863             $kernel->post($self->{AliasTCP}, 'reconnect_delayed');
864             }
865              
866             $self->do_callback('Disconnected');
867             }
868              
869             sub tcp_reconnect_delayed {
870             my $self = shift;
871             my ($kernel, $heap) = @_[KERNEL, HEAP];
872              
873             return unless $self->{Reconnect};
874              
875             # Pick a new RemoteAddress if there's more than one
876             if (ref $self->{RemoteAddress}) {
877             $self->{current_RemoteAddress} = shift @{ $self->{RemoteAddress} };
878             push @{ $self->{RemoteAddress} }, $self->{current_RemoteAddress};
879             }
880              
881             my $delay = 2 ** ++$self->{reconnect_attempt};
882             $self->{Logger}->info("Reconnecting to '$$self{current_RemoteAddress}' in $delay sec");
883              
884             # This state is in the TCP session, so we can call 'reconnect' directly
885             $kernel->delay('reconnect', $delay, $self->{current_RemoteAddress}, $self->{RemotePort});
886             }
887              
888             sub do_callback {
889             my ($self, $callback, @args) = @_;
890              
891             return unless $self->{Callbacks}{$callback};
892             foreach my $subref (@{ $self->{Callbacks}{$callback} }) {
893             $subref->($self, @args);
894             }
895             return;
896             }
897              
898             {
899             package POE::Component::Client::AMQP::FakeLogger;
900              
901             use strict;
902             use warnings;
903              
904             sub new {
905             my ($class, %self) = @_;
906             return bless \%self, $class;
907             }
908              
909             sub info { shift->log_it('INFO', @_) }
910             sub error { shift->log_it('ERROR', @_) }
911             sub debug { shift->log_it('DEBUG', @_) }
912              
913             sub log_it {
914             my ($self, $method, $message) = @_;
915             return if $method eq 'DEBUG' && ! $self->{debug};
916             chomp $message;
917             print '[' . localtime(time) ."] $method: $message\n";
918             }
919             }
920              
921             =head1 SEE ALSO
922              
923             L, L
924              
925             =head1 DEVELOPMENT
926              
927             This module is being developed via a git repository publicly avaiable at http://github.com/ewaters/poe-component-client-amqp. I encourage anyone who is interested to fork my code and contribute bug fixes or new features, or just have fun and be creative.
928              
929             =head1 COPYRIGHT
930              
931             Copyright (c) 2009 Eric Waters and XMission LLC (http://www.xmission.com/). All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
932              
933             The full text of the license can be found in the LICENSE file included with this module.
934              
935             =head1 AUTHOR
936              
937             Eric Waters
938              
939             =cut
940              
941             1;