File Coverage

blib/lib/Test/Net/RabbitMQ.pm
Criterion Covered Total %
statement 152 162 93.8
branch 86 126 68.2
condition 15 24 62.5
subroutine 24 26 92.3
pod 18 18 100.0
total 295 356 82.8


line stmt bran cond sub pod time code
1             package Test::Net::RabbitMQ;
2 9     9   929956 use Moose;
  9         4286819  
  9         68  
3 9     9   66920 use warnings;
  9         25  
  9         341  
4 9     9   60 use strict;
  9         19  
  9         342  
5              
6 9     9   6117 use Math::Int64 0.34 qw( uint64 );
  9         24431  
  9         52  
7              
8             our $VERSION = '0.13';
9              
10             # ABSTRACT: A mock RabbitMQ implementation for use when testing.
11              
12              
13             # Bindings are stored in the following form:
14             # {
15             # exchange_name => {
16             # regex => queue_name
17             # },
18             # ...
19             # }
20             has bindings => (
21             traits => [ qw(Hash) ],
22             is => 'rw',
23             isa => 'HashRef',
24             default => sub { {} },
25             );
26              
27              
28             has connectable => (
29             is => 'rw',
30             isa => 'Bool',
31             default => 1
32             );
33              
34             has connected => (
35             is => 'rw',
36             isa => 'Bool',
37             default => 0
38             );
39              
40             has channels => (
41             traits => [ qw(Hash) ],
42             is => 'rw',
43             isa => 'HashRef',
44             default => sub { {} },
45             handles => {
46             _channel_exists => 'exists',
47             _get_channel => 'get',
48             _remove_channel => 'delete',
49             _set_channel => 'set',
50             }
51             );
52              
53              
54             has debug => (
55             is => 'rw',
56             isa => 'Bool',
57             default => 0
58             );
59              
60             has exchanges => (
61             traits => [ qw(Hash) ],
62             is => 'rw',
63             isa => 'HashRef',
64             default => sub { {} },
65             handles => {
66             _exchange_exists => 'exists',
67             _get_exchange => 'get',
68             _remove_exchange => 'delete',
69             _set_exchange => 'set',
70             }
71             );
72              
73             has queue => (
74             is => 'rw',
75             isa => 'Str',
76             predicate => '_has_queue',
77             clearer => '_clear_queue',
78             );
79              
80             has queues => (
81             traits => [ qw(Hash) ],
82             is => 'rw',
83             isa => 'HashRef',
84             default => sub { {} },
85             handles => {
86             _queue_exists => 'exists',
87             _get_queue => 'get',
88             _remove_queue => 'delete',
89             _set_queue => 'set',
90             }
91             );
92              
93             has delivery_tag => (
94             is => 'rw',
95             isa => 'Math::UInt64',
96             default => sub { uint64(0) },
97             clearer => '_reset_delivery_tag',
98             writer => '_set_delivery_tag',
99             );
100              
101             sub _inc_delivery_tag {
102 12     12   20 my $self = shift;
103 12         322 $self->_set_delivery_tag( $self->delivery_tag + 1 );
104             }
105              
106             sub _dec_delivery_tag {
107 0     0   0 my $self = shift;
108 0         0 $self->_set_delivery_tag( $self->delivery_tag - 1 );
109             }
110              
111             has _tx_messages => (
112             is => 'ro',
113             isa => 'HashRef',
114             default => sub{ {} },
115             );
116              
117              
118             sub channel_close {
119 2     2 1 394 my ($self, $channel) = @_;
120              
121 2 50       65 die "Not connected" unless $self->connected;
122              
123 2 100       72 die "Unknown channel: $channel" unless $self->_channel_exists($channel);
124              
125 1         37 $self->_remove_channel($channel);
126             }
127              
128              
129             sub channel_open {
130 8     8 1 2369 my ($self, $channel) = @_;
131              
132 8 100       231 die "Not connected" unless $self->connected;
133              
134 7         244 $self->_set_channel($channel, 1);
135             }
136              
137              
138             sub connect {
139 10     10 1 8978 my ($self) = @_;
140              
141 10 100       345 die('Unable to connect!') unless $self->connectable;
142              
143 9         251 $self->connected(1);
144             }
145              
146              
147             my $ctag = 0;
148             sub consume {
149 6     6 1 802 my ($self, $channel, $queue, $options) = @_;
150              
151 6 100       173 die "Not connected" unless $self->connected;
152              
153 5 50       172 die "Unknown channel" unless $self->_channel_exists($channel);
154              
155 5 50       168 die "Unknown queue" unless $self->_queue_exists($queue);
156              
157 5         35 $options = $self->_apply_defaults( $options, {
158             no_local => 0,
159             no_ack => 1,
160             exclusive => 0,
161             });
162              
163 5 100       33 die "no_ack=>0 is not supported at this time" if !$options->{no_ack};
164              
165 4         121 $self->queue($queue);
166              
167             return exists $options->{consumer_tag}
168             ? $options->{consumer_tag}
169 4 50       46 : 'consumer-tag-' . $ctag++;
170             }
171              
172              
173             sub cancel {
174 6     6 1 1387 my ($self, $channel, $consumer_tag) = @_;
175              
176 6 100       210 die "Not connected" unless $self->connected;
177              
178 5 100 66     46 die "You must provide a consumer tag"
179             unless defined $consumer_tag && length $consumer_tag;
180              
181 2 100       64 return 0 unless $self->_has_queue;
182              
183 1         31 $self->_clear_queue;
184              
185 1         4 return 1;
186             }
187              
188              
189             sub disconnect {
190 2     2 1 614 my ($self) = @_;
191              
192 2 50       70 die "Not connected" unless $self->connected;
193              
194 2         54 $self->connected(0);
195             }
196              
197              
198             sub exchange_declare {
199 6     6 1 422 my ($self, $channel, $exchange, $options) = @_;
200              
201 6 50       171 die "Not connected" unless $self->connected;
202              
203 6 50       211 die "Unknown channel" unless $self->_channel_exists($channel);
204              
205 6         210 $self->_set_exchange($exchange, 1);
206             }
207              
208              
209             sub exchange_delete {
210 1     1 1 350 my ($self, $channel, $exchange, $options) = @_;
211              
212 1 50       36 die "Not connected" unless $self->connected;
213              
214 1 50       35 die "Unknown channel" unless $self->_channel_exists($channel);
215              
216 1         37 $self->_remove_exchange($exchange);
217             }
218              
219              
220             sub tx_select {
221 4     4 1 1750 my ($self, $channel) = @_;
222              
223 4 100       97 die "Not connected" unless $self->connected;
224              
225 3 100       86 die "Unknown channel: $channel" unless $self->_channel_exists($channel);
226              
227 2         42 my $messages = $self->_tx_messages->{ $channel };
228 2 50       4 die "Transaction already started" if $messages;
229              
230 2         42 $self->_tx_messages->{ $channel } = [];
231             }
232              
233              
234             sub tx_commit {
235 4     4 1 814 my ($self, $channel) = @_;
236              
237 4 100       102 die "Not connected" unless $self->connected;
238              
239 3 100       93 die "Unknown channel: $channel" unless $self->_channel_exists($channel);
240              
241 2         48 my $messages = $self->_tx_messages->{ $channel };
242 2 100       11 die "Transaction not yet started" unless $messages;
243              
244 1         2 foreach my $message (@$messages) {
245 1         4 $self->_publish( $channel, @$message );
246             }
247              
248 1         22 delete $self->_tx_messages->{ $channel };
249             }
250              
251              
252             sub tx_rollback {
253 4     4 1 1152 my ($self, $channel) = @_;
254              
255 4 100       106 die "Not connected" unless $self->connected;
256              
257 3 100       85 die "Unknown channel: $channel" unless $self->_channel_exists($channel);
258              
259 2         43 my $messages = $self->_tx_messages->{ $channel };
260 2 100       14 die "Transaction not yet started" unless $messages;
261              
262 1         22 delete $self->_tx_messages->{ $channel };
263             }
264              
265              
266             sub get {
267 9     9 1 2725 my ($self, $channel, $queue, $options) = @_;
268              
269 9 50       275 die "Not connected" unless $self->connected;
270              
271 9 50       350 die "Unknown channel" unless $self->_channel_exists($channel);
272              
273 9 50       302 die "Unknown queue: $queue" unless $self->_queue_exists($queue);
274              
275 9         14 my $message = shift(@{ $self->_get_queue($queue) });
  9         289  
276              
277 9 100       27 return undef unless defined($message);
278              
279 8         21 $message->{delivery_tag} = $self->_inc_delivery_tag;
280 8         21 $message->{content_type} = '';
281 8         15 $message->{redelivered} = 0;
282 8         23 $message->{message_count} = 0;
283              
284 8         21 return $message;
285             }
286              
287              
288             sub queue_bind {
289 9     9 1 57 my ($self, $channel, $queue, $exchange, $pattern) = @_;
290              
291 9 50       249 die "Not connected" unless $self->connected;
292              
293 9 50       338 die "Unknown channel: $channel" unless $self->_channel_exists($channel);
294              
295 9 50       373 die "Unknown queue: $queue" unless $self->_queue_exists($queue);
296              
297 9 50       324 die "Unknown exchange: $exchange" unless $self->_exchange_exists($exchange);
298              
299 9   100     248 my $binds = $self->bindings->{$exchange} || {};
300              
301             # Turn the pattern we're given into an actual regex
302 9         21 my $regex = $pattern;
303 9 100 100     56 if(($pattern =~ /\#/) || ($pattern =~ /\*/)) {
304 3 100       10 if($pattern =~ /\#/) {
    50          
305 2         6 $regex =~ s/\#/\.\*/g;
306             } elsif($pattern =~ /\*/) {
307 1         5 $regex =~ s/\*/\[^\.]\*/g;
308             }
309 3         8 $regex = '^'.$regex.'$';
310 3         51 $regex = qr($regex);
311             } else {
312 6         108 $regex = qr/^$pattern$/;
313             }
314              
315             # $self->_set_binding($routing_key, { queue => $queue, exchange => $exchange });
316 9         36 $binds->{$regex} = $queue;
317              
318             # In case these are new bindings
319 9         273 $self->bindings->{$exchange} = $binds;
320             }
321              
322              
323             my $queue = 0;
324             sub queue_declare {
325 12     12 1 1998 my ($self, $channel, $queue, $options) = @_;
326              
327 12 50       380 die "Not connected" unless $self->connected;
328              
329 12 50       411 die "Unknown channel: $channel" unless $self->_channel_exists($channel);
330              
331 12 100       39 if ($options->{passive}) {
332             # Would rabbitmq die if $queue was undef or q{}?
333             return
334 2 50 33     77 unless defined $queue
      33        
335             && length $queue
336             && $self->_queue_exists($queue);
337             }
338             else {
339 10 50 33     62 $queue = 'queue-' . $queue++
340             unless defined $queue && length $queue;
341 10 100       349 $self->_set_queue($queue, []) unless $self->_queue_exists($queue);
342             }
343              
344 12 100       46 return $queue unless wantarray;
345             return (
346             $queue,
347 3 50 33     5 scalar @{ $self->_get_queue($queue) },
  3         101  
348             $self->queue && $self->queue eq $queue ? 1 : 0,
349             );
350             }
351              
352              
353             sub queue_delete {
354 1     1 1 1062 my ($self, $channel, $queue, $options) = @_;
355              
356 1 50       36 die "Not connected" unless $self->connected;
357              
358 1 50       36 die "Unknown channel" unless $self->_channel_exists($channel);
359              
360 1         35 $self->_remove_queue($queue);
361             }
362              
363              
364             sub queue_unbind {
365 0     0 1 0 my ($self, $channel, $queue, $exchange, $routing_key) = @_;
366              
367 0 0       0 die "Not connected" unless $self->connected;
368              
369 0 0       0 die "Unknown channel: $channel" unless $self->_channel_exists($channel);
370              
371 0 0       0 die "Unknown queue: $queue" unless $self->_queue_exists($queue);
372              
373 0 0       0 die "Unknown exchange: $queue" unless $self->_exchange_exists($exchange);
374              
375 0 0       0 die "Unknown routing: $routing_key" unless $self->_binding_exists($routing_key);
376              
377 0         0 $self->_remove_binding($routing_key);
378             }
379              
380              
381             sub publish {
382 11     11 1 1526 my $self = shift;
383 11         19 my $channel = shift;
384              
385 11 100       310 die "Not connected" unless $self->connected;
386              
387 10 50       331 die "Unknown channel: $channel" unless $self->_channel_exists($channel);
388              
389 10         265 my $messages = $self->_tx_messages->{ $channel };
390 10 100       25 if ($messages) {
391 2         5 push @$messages, [ @_ ];
392 2         4 return;
393             }
394              
395 8         28 $self->_publish( $channel, @_ );
396             }
397              
398             sub _publish {
399 9     9   30 my ($self, $channel, $routing_key, $body, $options, $props) = @_;
400              
401 9         14 my $exchange = $options->{exchange};
402 9 50       38 unless($exchange) {
403 0         0 $exchange = 'amq.direct';
404             }
405              
406 9 50       310 die "Unknown exchange: $exchange" unless $self->_exchange_exists($exchange);
407              
408             # Get the bindings for the specified exchange and test each key to see
409             # if our routing key matches. If it does, push it into the queue
410 9         246 my $binds = $self->bindings->{$exchange};
411 9         23 foreach my $pattern (keys %{ $binds }) {
  9         35  
412 18 100       288 if($routing_key =~ $pattern) {
413 13 50       388 print STDERR "Publishing '$routing_key' to ".$binds->{$pattern}."\n" if $self->debug;
414 13   100     101 my $message = {
415             body => $body,
416             routing_key => $routing_key,
417             exchange => $exchange,
418             props => $props || {},
419             };
420 13         24 push(@{ $self->_get_queue($binds->{$pattern}) }, $message);
  13         440  
421             }
422             }
423             }
424              
425              
426             sub recv {
427 6     6 1 2543 my ($self) = @_;
428              
429 6 100       202 die "Not connected" unless $self->connected;
430              
431 5         123 my $queue = $self->queue;
432 5 50       18 die "No queue, did you consume() first?" unless defined($queue);
433              
434 5         8 my $message = shift(@{ $self->_get_queue($self->queue) });
  5         114  
435              
436 5 100       26 return undef unless defined $message;
437              
438 4         16 $message->{delivery_tag} = $self->_inc_delivery_tag;
439 4         11 $message->{consumer_tag} = '';
440 4         10 $message->{redelivered} = 0;
441              
442 4         20 return $message;
443             }
444              
445             sub _apply_defaults {
446 5     5   16 my ($self, $args, $defaults) = @_;
447              
448 5   100     32 $args ||= {};
449 5         11 my $new_args = {};
450              
451 5         20 foreach my $key (keys %$args) {
452 2         5 $new_args->{$key} = $args->{$key};
453             }
454              
455 5         17 foreach my $key (keys %$defaults) {
456 15 100       32 next if exists $new_args->{$key};
457 13         25 $new_args->{$key} = $defaults->{$key};
458             }
459              
460 5         17 return $new_args;
461             }
462              
463             1;
464              
465             __END__
466              
467             =pod
468              
469             =head1 NAME
470              
471             Test::Net::RabbitMQ - A mock RabbitMQ implementation for use when testing.
472              
473             =head1 VERSION
474              
475             version 0.13
476              
477             =head1 SYNOPSIS
478              
479             use Test::Net::RabbitMQ;
480              
481             my $mq = Test::Net::RabbitMQ->new;
482              
483             $mq->connect;
484              
485             $mq->channel_open(1);
486              
487             $mq->exchange_declare(1, 'order');
488             $mq->queue_declare(1, 'new-orders');
489              
490             $mq->queue_bind(1, 'new-orders', 'order', 'order.new');
491              
492             $mq->publish(1, 'order.new', 'hello!', { exchange => 'order' });
493              
494             $mq->consume(1, 'new-orders');
495              
496             my $msg = $mq->recv;
497              
498             # Or
499              
500             my $msg = $mq->get(1, 'order.new', {});
501              
502             =head1 DESCRIPTION
503              
504             Test::Net::RabbitMQ is a terrible approximation of using the real thing, but
505             hopefully will allow you to test systems that use L<Net::AMQP::RabbitMQ> or
506             L<Net::RabbitMQ> without having to use an actual RabbitMQ instance.
507              
508             The general overview is that calls to C<publish> pushes a message into one
509             or more queues (or none if there are no bindings) and calls to C<recv>
510             pop them.
511              
512             =head1 CAVEATS
513              
514             This module has all the features I've needed to successfully test our
515             RabbitMQ-using application. Patches are welcome if I'm missing something you
516             need! At the moment there are a number of shortcomings:
517              
518             =over 4
519              
520             =item C<recv> doesn't block
521              
522             =item exchanges are all topic
523              
524             =item lots of other stuff!
525              
526             =back
527              
528             =head1 ATTRIBUTES
529              
530             =head2 connectable
531              
532             If false then any calls to connect will die to emulate a failed connection.
533              
534             =head2 debug
535              
536             If set to true (which you can do at any time) then a message will be emitted
537             to STDERR any time a message is added to a queue.
538              
539             =head1 METHODS
540              
541             =head2 channel_close($number)
542              
543             Closes the specific channel.
544              
545             =head2 channel_open($number)
546              
547             Opens a channel with the specific number.
548              
549             =head2 connect
550              
551             Connects this instance. Does nothing except set C<connected> to true. Will
552             throw an exception if you've set C<connectable> to false.
553              
554             =head2 consume($channel, $queue)
555              
556             Sets the queue that will be popped when C<recv> is called.
557              
558             =head2 cancel($channel, $consumer_tag)
559              
560             Cancels the subscription for the given consumer tag. Calls to C<recv> after
561             this will throw an error unless you call C<consume> again. This method always
562             returns true if there is a subscription to cancel, false otherwise.
563              
564             =head2 disconnect
565              
566             Disconnects this instance by setting C<connected> to false.
567              
568             =head2 exchange_declare($channel, $exchange, $options)
569              
570             Creates an exchange of the specified name.
571              
572             =head2 exchange_delete($channel, $exchange, $options)
573              
574             Deletes an exchange of the specified name.
575              
576             =head2 tx_select($channel)
577              
578             Begins a transaction on the specified channel. From this point forward all
579             publish() calls on the channel will be buffered until a call to L</tx_commit>
580             or L</tx_rollback> is made.
581              
582             =head2 tx_commit($channel)
583              
584             Commits a transaction on the specified channel, causing all buffered publish()
585             calls to this point to be published.
586              
587             =head2 tx_rollback($channel)
588              
589             Rolls the transaction back, causing all buffered publish() calls to be wiped.
590              
591             =head2 get ($channel, $queue, $options)
592              
593             Get a message from the queue, if there is one.
594              
595             Like C<Net::RabbitMQ>, this will return a hash containing the following
596             information:
597              
598             {
599             body => 'Magic Transient Payload', # the reconstructed body
600             routing_key => 'nr_test_q', # route the message took
601             exchange => 'nr_test_x', # exchange used
602             delivery_tag => uint64(1), # (inc'd every recv or get)
603             redelivered => 0, # always 0
604             message_count => 0, # always 0
605             }
606              
607             =head2 queue_bind($channel, $queue, $exchange, $routing_key)
608              
609             Binds the specified queue to the specified exchange using the provided
610             routing key. B<Note that, at the moment, this doesn't work with AMQP wildcards.
611             Only with exact matches of the routing key.>
612              
613             =head2 queue_declare($channel, $queue, $options)
614              
615             Creates a queue of the specified name.
616              
617             =head2 queue_delete($channel, $queue, $options)
618              
619             Deletes a queue of the specified name.
620              
621             =head2 queue_unbind($channel, $queue, $exchange, $routing_key)
622              
623             Unbinds the specified routing key from the provided queue and exchange.
624              
625             =head2 publish($channel, $routing_key, $body, $options)
626              
627             Publishes the specified body with the supplied routing key. If there is a
628             binding that matches then the message will be added to the appropriate queue(s).
629              
630             =head2 recv
631              
632             Provided you've called C<consume> then calls to recv will C<pop> the next
633             message of the queue. B<Note that this method does not block.>
634              
635             Like C<Net::RabbitMQ>, this will return a hash containing the following
636             information:
637              
638             {
639             body => 'Magic Transient Payload', # the reconstructed body
640             routing_key => 'nr_test_q', # route the message took
641             exchange => 'nr_test_x', # exchange used
642             delivery_tag => uint64(1), # (inc'd every recv or get)
643             redelivered => $boolean # if message is redelivered
644             consumer_tag => '', # Always blank currently
645             props => $props, # hashref sent in
646             }
647              
648             =head1 AUTHOR
649              
650             Cory G Watson <gphat@cpan.org>
651              
652             =head1 COPYRIGHT AND LICENSE
653              
654             This software is copyright (c) 2015 by Cory G Watson.
655              
656             This is free software; you can redistribute it and/or modify it under
657             the same terms as the Perl 5 programming language system itself.
658              
659             =cut