File Coverage

blib/lib/Test/Net/RabbitMQ.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Test::Net::RabbitMQ;
2 9     9   173003 use Moose;
  0            
  0            
3             use warnings;
4             use strict;
5              
6             our $VERSION = '0.12';
7              
8             # ABSTRACT: A mock RabbitMQ implementation for use when testing.
9              
10              
11             # Bindings are stored in the following form:
12             # {
13             # exchange_name => {
14             # regex => queue_name
15             # },
16             # ...
17             # }
18             has bindings => (
19             traits => [ qw(Hash) ],
20             is => 'rw',
21             isa => 'HashRef',
22             default => sub { {} },
23             );
24              
25              
26             has connectable => (
27             is => 'rw',
28             isa => 'Bool',
29             default => 1
30             );
31              
32             has connected => (
33             is => 'rw',
34             isa => 'Bool',
35             default => 0
36             );
37              
38             has channels => (
39             traits => [ qw(Hash) ],
40             is => 'rw',
41             isa => 'HashRef',
42             default => sub { {} },
43             handles => {
44             _channel_exists => 'exists',
45             _get_channel => 'get',
46             _remove_channel => 'delete',
47             _set_channel => 'set',
48             }
49             );
50              
51              
52             has debug => (
53             is => 'rw',
54             isa => 'Bool',
55             default => 0
56             );
57              
58             has exchanges => (
59             traits => [ qw(Hash) ],
60             is => 'rw',
61             isa => 'HashRef',
62             default => sub { {} },
63             handles => {
64             _exchange_exists => 'exists',
65             _get_exchange => 'get',
66             _remove_exchange => 'delete',
67             _set_exchange => 'set',
68             }
69             );
70              
71             has queue => (
72             is => 'rw',
73             isa => 'Str',
74             predicate => '_has_queue',
75             clearer => '_clear_queue',
76             );
77              
78             has queues => (
79             traits => [ qw(Hash) ],
80             is => 'rw',
81             isa => 'HashRef',
82             default => sub { {} },
83             handles => {
84             _queue_exists => 'exists',
85             _get_queue => 'get',
86             _remove_queue => 'delete',
87             _set_queue => 'set',
88             }
89             );
90              
91             has delivery_tag => (
92             traits => [ qw(Counter) ],
93             is => 'ro',
94             isa => 'Num',
95             default => 0,
96             handles => {
97             _inc_delivery_tag => 'inc',
98             _dec_delivery_tag => 'dec',
99             _reset_delivery_tag => 'reset',
100             },
101             );
102              
103             has _tx_messages => (
104             is => 'ro',
105             isa => 'HashRef',
106             default => sub{ {} },
107             );
108              
109              
110             sub channel_close {
111             my ($self, $channel) = @_;
112              
113             die "Not connected" unless $self->connected;
114              
115             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
116              
117             $self->_remove_channel($channel);
118             }
119              
120              
121             sub channel_open {
122             my ($self, $channel) = @_;
123              
124             die "Not connected" unless $self->connected;
125              
126             $self->_set_channel($channel, 1);
127             }
128              
129              
130             sub connect {
131             my ($self) = @_;
132              
133             die('Unable to connect!') unless $self->connectable;
134              
135             $self->connected(1);
136             }
137              
138              
139             my $ctag = 0;
140             sub consume {
141             my ($self, $channel, $queue, $options) = @_;
142              
143             die "Not connected" unless $self->connected;
144              
145             die "Unknown channel" unless $self->_channel_exists($channel);
146              
147             die "Unknown queue" unless $self->_queue_exists($queue);
148              
149             $options = $self->_apply_defaults( $options, {
150             no_local => 0,
151             no_ack => 1,
152             exclusive => 0,
153             });
154              
155             die "no_ack=>0 is not supported at this time" if !$options->{no_ack};
156              
157             $self->queue($queue);
158              
159             return exists $options->{consumer_tag}
160             ? $options->{consumer_tag}
161             : 'consumer-tag-' . $ctag++;
162             }
163              
164              
165             sub cancel {
166             my ($self, $channel, $consumer_tag) = @_;
167              
168             die "Not connected" unless $self->connected;
169              
170             die "You must provide a consumer tag"
171             unless defined $consumer_tag && length $consumer_tag;
172              
173             return 0 unless $self->_has_queue;
174              
175             $self->_clear_queue;
176              
177             return 1;
178             }
179              
180              
181             sub disconnect {
182             my ($self) = @_;
183              
184             die "Not connected" unless $self->connected;
185              
186             $self->connected(0);
187             }
188              
189              
190             sub exchange_declare {
191             my ($self, $channel, $exchange, $options) = @_;
192              
193             die "Not connected" unless $self->connected;
194              
195             die "Unknown channel" unless $self->_channel_exists($channel);
196              
197             $self->_set_exchange($exchange, 1);
198             }
199              
200              
201             sub exchange_delete {
202             my ($self, $channel, $exchange, $options) = @_;
203              
204             die "Not connected" unless $self->connected;
205              
206             die "Unknown channel" unless $self->_channel_exists($channel);
207              
208             $self->_remove_exchange($exchange);
209             }
210              
211              
212             sub tx_select {
213             my ($self, $channel) = @_;
214              
215             die "Not connected" unless $self->connected;
216              
217             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
218              
219             my $messages = $self->_tx_messages->{ $channel };
220             die "Transaction already started" if $messages;
221              
222             $self->_tx_messages->{ $channel } = [];
223             }
224              
225              
226             sub tx_commit {
227             my ($self, $channel) = @_;
228              
229             die "Not connected" unless $self->connected;
230              
231             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
232              
233             my $messages = $self->_tx_messages->{ $channel };
234             die "Transaction not yet started" unless $messages;
235              
236             foreach my $message (@$messages) {
237             $self->_publish( $channel, @$message );
238             }
239              
240             delete $self->_tx_messages->{ $channel };
241             }
242              
243              
244             sub tx_rollback {
245             my ($self, $channel) = @_;
246              
247             die "Not connected" unless $self->connected;
248              
249             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
250              
251             my $messages = $self->_tx_messages->{ $channel };
252             die "Transaction not yet started" unless $messages;
253              
254             delete $self->_tx_messages->{ $channel };
255             }
256              
257              
258             sub get {
259             my ($self, $channel, $queue, $options) = @_;
260              
261             die "Not connected" unless $self->connected;
262              
263             die "Unknown channel" unless $self->_channel_exists($channel);
264              
265             die "Unknown queue: $queue" unless $self->_queue_exists($queue);
266              
267             my $message = shift(@{ $self->_get_queue($queue) });
268              
269             return undef unless defined($message);
270              
271             $message->{delivery_tag} = $self->_inc_delivery_tag;
272             $message->{content_type} = '';
273             $message->{redelivered} = 0;
274             $message->{message_count} = 0;
275              
276             return $message;
277             }
278              
279              
280             sub queue_bind {
281             my ($self, $channel, $queue, $exchange, $pattern) = @_;
282              
283             die "Not connected" unless $self->connected;
284              
285             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
286              
287             die "Unknown queue: $queue" unless $self->_queue_exists($queue);
288              
289             die "Unknown exchange: $exchange" unless $self->_exchange_exists($exchange);
290              
291             my $binds = $self->bindings->{$exchange} || {};
292              
293             # Turn the pattern we're given into an actual regex
294             my $regex = $pattern;
295             if(($pattern =~ /\#/) || ($pattern =~ /\*/)) {
296             if($pattern =~ /\#/) {
297             $regex =~ s/\#/\.\*/g;
298             } elsif($pattern =~ /\*/) {
299             $regex =~ s/\*/\[^\.]\*/g;
300             }
301             $regex = '^'.$regex.'$';
302             $regex = qr($regex);
303             } else {
304             $regex = qr/^$pattern$/;
305             }
306              
307             # $self->_set_binding($routing_key, { queue => $queue, exchange => $exchange });
308             $binds->{$regex} = $queue;
309              
310             # In case these are new bindings
311             $self->bindings->{$exchange} = $binds;
312             }
313              
314              
315             my $queue = 0;
316             sub queue_declare {
317             my ($self, $channel, $queue, $options) = @_;
318              
319             die "Not connected" unless $self->connected;
320              
321             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
322              
323             if ($options->{passive}) {
324             # Would rabbitmq die if $queue was undef or q{}?
325             return
326             unless defined $queue
327             && length $queue
328             && $self->_queue_exists($queue);
329             }
330             else {
331             $queue = 'queue-' . $queue++
332             unless defined $queue && length $queue;
333             $self->_set_queue($queue, []) unless $self->_queue_exists($queue);
334             }
335              
336             return $queue unless wantarray;
337             return (
338             $queue,
339             scalar @{ $self->_get_queue($queue) },
340             $self->queue && $self->queue eq $queue ? 1 : 0,
341             );
342             }
343              
344              
345             sub queue_delete {
346             my ($self, $channel, $queue, $options) = @_;
347              
348             die "Not connected" unless $self->connected;
349              
350             die "Unknown channel" unless $self->_channel_exists($channel);
351              
352             $self->_remove_queue($queue);
353             }
354              
355              
356             sub queue_unbind {
357             my ($self, $channel, $queue, $exchange, $routing_key) = @_;
358              
359             die "Not connected" unless $self->connected;
360              
361             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
362              
363             die "Unknown queue: $queue" unless $self->_queue_exists($queue);
364              
365             die "Unknown exchange: $queue" unless $self->_exchange_exists($exchange);
366              
367             die "Unknown routing: $routing_key" unless $self->_binding_exists($routing_key);
368              
369             $self->_remove_binding($routing_key);
370             }
371              
372              
373             sub publish {
374             my $self = shift;
375             my $channel = shift;
376              
377             die "Not connected" unless $self->connected;
378              
379             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
380              
381             my $messages = $self->_tx_messages->{ $channel };
382             if ($messages) {
383             push @$messages, [ @_ ];
384             return;
385             }
386              
387             $self->_publish( $channel, @_ );
388             }
389              
390             sub _publish {
391             my ($self, $channel, $routing_key, $body, $options, $props) = @_;
392              
393             my $exchange = $options->{exchange};
394             unless($exchange) {
395             $exchange = 'amq.direct';
396             }
397              
398             die "Unknown exchange: $exchange" unless $self->_exchange_exists($exchange);
399              
400             # Get the bindings for the specified exchange and test each key to see
401             # if our routing key matches. If it does, push it into the queue
402             my $binds = $self->bindings->{$exchange};
403             foreach my $pattern (keys %{ $binds }) {
404             if($routing_key =~ $pattern) {
405             print STDERR "Publishing '$routing_key' to ".$binds->{$pattern}."\n" if $self->debug;
406             my $message = {
407             body => $body,
408             routing_key => $routing_key,
409             exchange => $exchange,
410             props => $props || {},
411             };
412             push(@{ $self->_get_queue($binds->{$pattern}) }, $message);
413             }
414             }
415             }
416              
417              
418             sub recv {
419             my ($self) = @_;
420              
421             die "Not connected" unless $self->connected;
422              
423             my $queue = $self->queue;
424             die "No queue, did you consume() first?" unless defined($queue);
425              
426             my $message = shift(@{ $self->_get_queue($self->queue) });
427              
428             return undef unless defined $message;
429              
430             $message->{delivery_tag} = $self->_inc_delivery_tag;
431             $message->{consumer_tag} = '';
432             $message->{redelivered} = 0;
433              
434             return $message;
435             }
436              
437             sub _apply_defaults {
438             my ($self, $args, $defaults) = @_;
439              
440             $args ||= {};
441             my $new_args = {};
442              
443             foreach my $key (keys %$args) {
444             $new_args->{$key} = $args->{$key};
445             }
446              
447             foreach my $key (keys %$defaults) {
448             next if exists $new_args->{$key};
449             $new_args->{$key} = $defaults->{$key};
450             }
451              
452             return $new_args;
453             }
454              
455             1;
456              
457             __END__
458              
459             =pod
460              
461             =head1 NAME
462              
463             Test::Net::RabbitMQ - A mock RabbitMQ implementation for use when testing.
464              
465             =head1 VERSION
466              
467             version 0.12
468              
469             =head1 SYNOPSIS
470              
471             use Test::Net::RabbitMQ;
472              
473             my $mq = Test::Net::RabbitMQ->new;
474              
475             $mq->connect;
476              
477             $mq->channel_open(1);
478              
479             $mq->exchange_declare(1, 'order');
480             $mq->queue_declare(1, 'new-orders');
481              
482             $mq->queue_bind(1, 'new-orders', 'order', 'order.new');
483              
484             $mq->publish(1, 'order.new', 'hello!', { exchange => 'order' });
485              
486             $mq->consume(1, 'new-orders');
487              
488             my $msg = $mq->recv;
489              
490             # Or
491              
492             my $msg = $mq->get(1, 'order.new', {});
493              
494             =head1 DESCRIPTION
495              
496             Test::Net::RabbitMQ is a terrible approximation of using the real thing, but
497             hopefully will allow you to test systems that use L<Net::AMQP::RabbitMQ> or
498             L<Net::RabbitMQ> without having to use an actual RabbitMQ instance.
499              
500             The general overview is that calls to C<publish> pushes a message into one
501             or more queues (or none if there are no bindings) and calls to C<recv>
502             pop them.
503              
504             =head1 CAVEATS
505              
506             This module has all the features I've needed to successfully test our
507             RabbitMQ-using application. Patches are welcome if I'm missing something you
508             need! At the moment there are a number of shortcomings:
509              
510             =over 4
511              
512             =item C<recv> doesn't block
513              
514             =item exchanges are all topic
515              
516             =item lots of other stuff!
517              
518             =back
519              
520             =head1 ATTRIBUTES
521              
522             =head2 connectable
523              
524             If false then any calls to connect will die to emulate a failed connection.
525              
526             =head2 debug
527              
528             If set to true (which you can do at any time) then a message will be emitted
529             to STDERR any time a message is added to a queue.
530              
531             =head1 METHODS
532              
533             =head2 channel_close($number)
534              
535             Closes the specific channel.
536              
537             =head2 channel_open($number)
538              
539             Opens a channel with the specific number.
540              
541             =head2 connect
542              
543             Connects this instance. Does nothing except set C<connected> to true. Will
544             throw an exception if you've set C<connectable> to false.
545              
546             =head2 consume($channel, $queue)
547              
548             Sets the queue that will be popped when C<recv> is called.
549              
550             =head2 cancel($channel, $consumer_tag)
551              
552             Cancels the subscription for the given consumer tag. Calls to C<recv> after
553             this will throw an error unless you call C<consume> again. This method always
554             returns true if there is a subscription to cancel, false otherwise.
555              
556             =head2 disconnect
557              
558             Disconnects this instance by setting C<connected> to false.
559              
560             =head2 exchange_declare($channel, $exchange, $options)
561              
562             Creates an exchange of the specified name.
563              
564             =head2 exchange_delete($channel, $exchange, $options)
565              
566             Deletes an exchange of the specified name.
567              
568             =head2 tx_select($channel)
569              
570             Begins a transaction on the specified channel. From this point forward all
571             publish() calls on the channel will be buffered until a call to L</tx_commit>
572             or L</tx_rollback> is made.
573              
574             =head2 tx_commit($channel)
575              
576             Commits a transaction on the specified channel, causing all buffered publish()
577             calls to this point to be published.
578              
579             =head2 tx_rollback($channel)
580              
581             Rolls the transaction back, causing all buffered publish() calls to be wiped.
582              
583             =head2 get ($channel, $queue, $options)
584              
585             Get a message from the queue, if there is one.
586              
587             Like C<Net::RabbitMQ>, this will return a hash containing the following
588             information:
589              
590             {
591             body => 'Magic Transient Payload', # the reconstructed body
592             routing_key => 'nr_test_q', # route the message took
593             exchange => 'nr_test_x', # exchange used
594             delivery_tag => 1, # (inc'd every recv or get)
595             redelivered => 0, # always 0
596             message_count => 0, # always 0
597             }
598              
599             =head2 queue_bind($channel, $queue, $exchange, $routing_key)
600              
601             Binds the specified queue to the specified exchange using the provided
602             routing key. B<Note that, at the moment, this doesn't work with AMQP wildcards.
603             Only with exact matches of the routing key.>
604              
605             =head2 queue_declare($channel, $queue, $options)
606              
607             Creates a queue of the specified name.
608              
609             =head2 queue_delete($channel, $queue, $options)
610              
611             Deletes a queue of the specified name.
612              
613             =head2 queue_unbind($channel, $queue, $exchange, $routing_key)
614              
615             Unbinds the specified routing key from the provided queue and exchange.
616              
617             =head2 publish($channel, $routing_key, $body, $options)
618              
619             Publishes the specified body with the supplied routing key. If there is a
620             binding that matches then the message will be added to the appropriate queue(s).
621              
622             =head2 recv
623              
624             Provided you've called C<consume> then calls to recv will C<pop> the next
625             message of the queue. B<Note that this method does not block.>
626              
627             Like C<Net::RabbitMQ>, this will return a hash containing the following
628             information:
629              
630             {
631             body => 'Magic Transient Payload', # the reconstructed body
632             routing_key => 'nr_test_q', # route the message took
633             exchange => 'nr_test_x', # exchange used
634             delivery_tag => 1, # (inc'd every recv or get)
635             redelivered => $boolean # if message is redelivered
636             consumer_tag => '', # Always blank currently
637             props => $props, # hashref sent in
638             }
639              
640             =head1 AUTHOR
641              
642             Cory G Watson <gphat@cpan.org>
643              
644             =head1 COPYRIGHT AND LICENSE
645              
646             This software is copyright (c) 2015 by Cory G Watson.
647              
648             This is free software; you can redistribute it and/or modify it under
649             the same terms as the Perl 5 programming language system itself.
650              
651             =cut