File Coverage

blib/lib/Net/RabbitMQ/Channel.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package Net::RabbitMQ::Channel;
2              
3 1     1   910 use Class::Easy;
  1         2  
  1         9  
4              
5 1     1   578 use Net::RabbitMQ;
  0            
  0            
6              
7             unless ($^O eq 'MSWin32') {
8             use Sys::SigAction;
9             }
10              
11             use Net::RabbitMQ::Exchange;
12             use Net::RabbitMQ::Queue;
13              
14             our $VERSION = '0.05';
15              
16             # has 'mq';
17              
18             has exchange_pack => 'Net::RabbitMQ::Exchange';
19             has queue_pack => 'Net::RabbitMQ::Queue';
20              
21             has 'number';
22              
23             our @NO_CHANNEL_CALLS = ('recv');
24              
25             sub new {
26             my $class = shift;
27             my $number = shift;
28             my $config = {@_};
29            
30             $config->{mq} = Net::RabbitMQ->new;
31             $config->{number} = $number;
32            
33             $config->{reconnect_timeout} ||= 10;
34            
35             my $self = bless $config, $class;
36              
37             if ($self->_confirmed_connect) {
38             return $self;
39             }
40            
41             die "can't open connection";
42             # if channel didn't open, then we died before this string
43             }
44              
45             sub exchange_declare {
46             my $self = shift;
47             my $name = shift;
48             my $args = {@_};
49            
50             $args->{package} = $self->exchange_pack
51             unless exists $args->{package};
52            
53             $args->{package}->new ($self, $name, %$args);
54             }
55              
56             sub queue_declare {
57             my $self = shift;
58             my $name = shift;
59             my $args = {@_};
60            
61             $args->{package} = $self->queue_pack
62             unless exists $args->{package};
63            
64             return $args->{package}->new ($self, $name, %$args);
65             }
66              
67             sub _failed_host_sort_sub {
68             my ($hosts, $sort_a, $sort_b) = @_;
69             return !exists $hosts->{$sort_a}->{failed}
70             ? -1
71             : !exists $hosts->{$sort_b}->{failed}
72             ? 1
73             : $hosts->{$sort_b}->{failed} <=> $hosts->{$sort_a}->{failed}
74             }
75              
76              
77             # here we try to connect, if all servers fails, then die
78             sub _confirmed_connect {
79             my $self = shift;
80             my $is_reconnect = shift;
81            
82             my $mq = $self->{mq};
83            
84             local $@;
85            
86             my $success = 0;
87            
88             my $hosts = $self->{hosts};
89            
90             foreach my $host (sort {_failed_host_sort_sub ($hosts, $a, $b)} keys %$hosts
91             ) {
92             my $last_failed = $hosts->{$host}->{failed};
93             if (defined $last_failed and (time - $last_failed < $self->{reconnect_timeout})) {
94             die "no hosts can be reached within $self->{reconnect_timeout} sec. you must check hosts availability or enlarge your interval";
95             }
96            
97             eval {
98             if ($^O ne 'MSWin32') {
99             # warn "setting alarm";
100             my $sig_handler = sub {
101             # failed disconnect is safe solution
102             # warn "alarm called";
103             $self->{hosts}->{$host}->{failed} = time;
104             $success = 0;
105             die "timeout";
106             };
107             my $h = Sys::SigAction::set_sig_handler ('ALRM', $sig_handler);
108             $h->{ACT}->{HANDLER} = $sig_handler;
109             #use Data::Dumper;
110             #warn Dumper $h;
111             #$SIG{'ALRM'} = sub {die;};
112             alarm ($self->{timeout} || 10);
113             }
114              
115             #warn "before connect";
116              
117             $mq->connect ($host, $self->{hosts}->{$host});
118             $self->_do ('channel_open')
119             unless $is_reconnect;
120             $success = 1;
121            
122             #warn "after connect";
123              
124             alarm 0;
125             };
126            
127             #warn "after eval, before alarm";
128              
129             alarm 0;
130             #warn "after eval";
131              
132             if ($success) {
133             delete $hosts->{$host}->{failed};
134             return 1
135             } else {
136             $self->{hosts}->{$host}->{failed} = time;
137             }
138             }
139            
140             die "we can't connect to any provided server: $@";
141             }
142              
143             sub _do {
144             my $self = shift;
145             my $cmd = shift;
146            
147             my $verify = "_verify_$cmd";
148            
149             # parameter verification
150             if ($self->can ($verify)) {
151             return unless $self->$verify (@_);
152             }
153            
154             local $@;
155            
156             my $result;
157             my $success = 0;
158            
159             # warn "cmd: $cmd, num: ", $self->number, ", array: @_";
160            
161             # use warnings FATAL => qw(uninitialized);
162              
163             my @params = @_;
164            
165             unshift @params, $self->number
166             unless scalar grep {$_ eq $cmd} @NO_CHANNEL_CALLS;
167            
168             # real server work -> we must restart connection after failure
169             eval {
170             # warn "$cmd ", $self->number;
171             $result = $self->{mq}->$cmd (@params);
172             $success = 1;
173             };
174            
175             #no warnings qw(uninitialized);
176             #debug "command: $cmd, result: $result, success: $success";
177             #use warnings qw(uninitialized);
178            
179             # TODO: check for real connection error, we don't want to run erratical command another time
180             unless ($success) {
181            
182             # warn $@;
183            
184             $self->_confirmed_connect (1); # send flag for reconnect
185             $self->{mq}->channel_open ($self->number);
186            
187             # if we have more than one failure after successful
188             # reconnect, then we must die
189             $result = $self->{mq}->$cmd (@params);
190             $success = 1;
191             }
192            
193             return wantarray ? ($success, $result) : $success;
194             }
195              
196             sub publish {
197             my $self = shift;
198             my $routing_key = shift;
199             my $body = shift;
200             my $props = {@_};
201            
202             my $opts = {};
203            
204             foreach my $k (keys %$props) {
205             if ($k eq 'exchange' or $k eq 'mandatory' or $k eq 'immediate') {
206             $opts->{$k} = delete $props->{$k};
207             }
208             }
209            
210             my ($success, $result) = $self->_do ('publish', $routing_key, $body, $opts, $props);
211             }
212              
213             sub close {
214             my $self = shift;
215            
216            
217             }
218              
219             1;
220              
221             =head1 NAME
222              
223             Net::RabbitMQ::Channel - use rabbitmq, OOP style
224              
225             =head1 SYNOPSIS
226              
227             use Net::RabbitMQ::Channel;
228              
229             my $channel = Net::RabbitMQ::Channel->new (1, {
230             hosts => {
231             rabbit1 => {user => 'guest', pass => 'guest'},
232             rabbit2 => {user => 'guest', pass => 'guest'}
233             }
234             });
235              
236             my $exchange = $channel->exchange_declare (
237             'test.x',
238             exchange_type => "topic",
239             );
240              
241             my $publisher_key = 'test.*';
242              
243             # consumer part
244             my $queue = $channel->queue_declare (
245             'test.q',
246             exclusive => 0,
247             );
248              
249             $queue->bind ($exchange, $publisher_key);
250              
251             # publisher part
252             $exchange->publish ($publisher_key, $message,
253             app_id => 'test',
254             timestamp => time
255             );
256            
257             # consumer part
258             my $message = $queue->get;
259              
260             =head1 METHODS
261              
262             =head2 init
263              
264             =over 4
265              
266             =item new
267              
268             my $channel = Net::RabbitMQ::Channel->new (1, {
269             # mandatory
270             hosts => {host_name => {user => 'user_name', pass => 'password'}},
271             # optional
272             });
273              
274              
275             when creating Net::RabbitMQ::Channel you must provide
276             channel number and configuration options.
277              
278             in the current version only 'hosts' key is supported. each key for 'hosts' specifies
279             one rabbitmq broker configuration. if current broker connection fails, module tries
280             to reconnect to another one.
281              
282              
283             =cut
284              
285             =back
286              
287             =head2 working with channel
288              
289             =over 4
290              
291             =item exchange_declare
292              
293             declares exchange
294              
295             my $exchange = $self->exchange_declare (
296             'test.exchange',
297             package => 'My::Exchange', # Net::RabbitMQ::Exchange if omitted
298             passive => 0, # 0
299             durable => 1, # 1
300             auto_delete => 0, # 0
301             exchange_type => "topic"
302             );
303              
304              
305             =item queue_declare
306              
307             declares queue
308              
309              
310             my $queue = $self->queue_declare (
311             'test.queue',
312             package => 'My::Queue', # Net::RabbitMQ::Queue if omitted
313             passive => 0, # 0
314             durable => 1, # 1
315             auto_delete => 0, # 0
316             exclusive => 0
317             );
318              
319              
320             =item publish
321              
322             publish message to routing key
323              
324             a typical workflow for a producer role is: open channel, declare exchange,
325             and publish message via routing key
326              
327             please, note: queue for recieving that message must be declared
328             and binded to exchange using routing key prior to message publishing.
329              
330             $channel->publish ($publisher_key, $message, {exchange => $exchange->name}, {
331             # content_type => $string,
332             # content_encoding => $string,
333             # correlation_id => $string,
334             # reply_to => $string,
335             # expiration => $string,
336             # message_id => $string,
337             # type => $string,
338             # user_id => $string,
339             app_id => 'test',
340             # delivery_mode => $integer,
341             # priority => $integer,
342             timestamp => time
343              
344             });
345            
346              
347             =item close
348              
349             stub
350              
351             =cut
352              
353             =back
354              
355             =head1 AUTHOR
356              
357             Ivan Baktsheev, C<< >>
358              
359             =head1 BUGS
360              
361             Please report any bugs or feature requests to my email address,
362             or through the web interface at L.
363             I will be notified, and then you'll automatically be notified
364             of progress on your bug as I make changes.
365              
366             =head1 SUPPORT
367              
368              
369              
370             =head1 ACKNOWLEDGEMENTS
371              
372              
373              
374             =head1 COPYRIGHT & LICENSE
375              
376             Copyright 2010-2011 Ivan Baktsheev
377              
378             This program is free software; you can redistribute it and/or modify it
379             under the same terms as Perl itself.
380              
381              
382             =cut