File Coverage

blib/lib/Test/Net/RabbitMQ.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Test::Net::RabbitMQ;
2             $Test::Net::RabbitMQ::VERSION = '0.10';
3 8     8   148930 use Moose;
  0            
  0            
4             use warnings;
5             use strict;
6              
7             # ABSTRACT: A mock RabbitMQ implementation for use when testing.
8              
9              
10             # Bindings are stored in the following form:
11             # {
12             # exchange_name => {
13             # regex => queue_name
14             # },
15             # ...
16             # }
17             has bindings => (
18             traits => [ qw(Hash) ],
19             is => 'rw',
20             isa => 'HashRef',
21             default => sub { {} },
22             );
23              
24              
25             has connectable => (
26             is => 'rw',
27             isa => 'Bool',
28             default => 1
29             );
30              
31             has connected => (
32             is => 'rw',
33             isa => 'Bool',
34             default => 0
35             );
36              
37             has channels => (
38             traits => [ qw(Hash) ],
39             is => 'rw',
40             isa => 'HashRef',
41             default => sub { {} },
42             handles => {
43             _channel_exists => 'exists',
44             _get_channel => 'get',
45             _remove_channel => 'delete',
46             _set_channel => 'set',
47             }
48             );
49              
50              
51             has debug => (
52             is => 'rw',
53             isa => 'Bool',
54             default => 0
55             );
56              
57             has exchanges => (
58             traits => [ qw(Hash) ],
59             is => 'rw',
60             isa => 'HashRef',
61             default => sub { {} },
62             handles => {
63             _exchange_exists => 'exists',
64             _get_exchange => 'get',
65             _remove_exchange => 'delete',
66             _set_exchange => 'set',
67             }
68             );
69              
70             has queue => (
71             is => 'rw',
72             isa => 'Str'
73             );
74              
75             has queues => (
76             traits => [ qw(Hash) ],
77             is => 'rw',
78             isa => 'HashRef',
79             default => sub { {} },
80             handles => {
81             _queue_exists => 'exists',
82             _get_queue => 'get',
83             _remove_queue => 'delete',
84             _set_queue => 'set',
85             }
86             );
87              
88             has delivery_tag => (
89             traits => [ qw(Counter) ],
90             is => 'ro',
91             isa => 'Num',
92             default => 0,
93             handles => {
94             _inc_delivery_tag => 'inc',
95             _dec_delivery_tag => 'dec',
96             _reset_delivery_tag => 'reset',
97             },
98             );
99              
100             has _tx_messages => (
101             is => 'ro',
102             isa => 'HashRef',
103             default => sub{ {} },
104             );
105              
106              
107             sub channel_close {
108             my ($self, $channel) = @_;
109              
110             die "Not connected" unless $self->connected;
111              
112             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
113              
114             $self->_remove_channel($channel);
115             }
116              
117              
118             sub channel_open {
119             my ($self, $channel) = @_;
120              
121             die "Not connected" unless $self->connected;
122              
123             $self->_set_channel($channel, 1);
124             }
125              
126              
127             sub connect {
128             my ($self) = @_;
129              
130             die('Unable to connect!') unless $self->connectable;
131              
132             $self->connected(1);
133             }
134              
135              
136             sub consume {
137             my ($self, $channel, $queue, $options) = @_;
138              
139             die "Not connected" unless $self->connected;
140              
141             die "Unknown channel" unless $self->_channel_exists($channel);
142              
143             die "Unknown queue" unless $self->_queue_exists($queue);
144              
145             $options = $self->_apply_defaults( $options, {
146             no_local => 0,
147             no_ack => 1,
148             exclusive => 0,
149             });
150              
151             die "no_ack=>0 is not supported at this time" if !$options->{no_ack};
152              
153             $self->queue($queue);
154             }
155              
156              
157             sub disconnect {
158             my ($self) = @_;
159              
160             die "Not connected" unless $self->connected;
161              
162             $self->connected(0);
163             }
164              
165              
166             sub exchange_declare {
167             my ($self, $channel, $exchange, $options) = @_;
168              
169             die "Not connected" unless $self->connected;
170              
171             die "Unknown channel" unless $self->_channel_exists($channel);
172              
173             $self->_set_exchange($exchange, 1);
174             }
175              
176              
177             sub tx_select {
178             my ($self, $channel) = @_;
179              
180             die "Not connected" unless $self->connected;
181              
182             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
183              
184             my $messages = $self->_tx_messages->{ $channel };
185             die "Transaction already started" if $messages;
186              
187             $self->_tx_messages->{ $channel } = [];
188             }
189              
190              
191             sub tx_commit {
192             my ($self, $channel) = @_;
193              
194             die "Not connected" unless $self->connected;
195              
196             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
197              
198             my $messages = $self->_tx_messages->{ $channel };
199             die "Transaction not yet started" unless $messages;
200              
201             foreach my $message (@$messages) {
202             $self->_publish( $channel, @$message );
203             }
204              
205             delete $self->_tx_messages->{ $channel };
206             }
207              
208              
209             sub tx_rollback {
210             my ($self, $channel) = @_;
211              
212             die "Not connected" unless $self->connected;
213              
214             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
215              
216             my $messages = $self->_tx_messages->{ $channel };
217             die "Transaction not yet started" unless $messages;
218              
219             delete $self->_tx_messages->{ $channel };
220             }
221              
222              
223             sub get {
224             my ($self, $channel, $queue, $options) = @_;
225              
226             die "Not connected" unless $self->connected;
227              
228             die "Unknown channel" unless $self->_channel_exists($channel);
229              
230             die "Unknown queue: $queue" unless $self->_queue_exists($queue);
231              
232             my $message = shift(@{ $self->_get_queue($queue) });
233              
234             return undef unless defined($message);
235              
236             $message->{delivery_tag} = $self->_inc_delivery_tag;
237             $message->{content_type} = '';
238             $message->{redelivered} = 0;
239             $message->{message_count} = 0;
240              
241             return $message;
242             }
243              
244              
245             sub queue_bind {
246             my ($self, $channel, $queue, $exchange, $pattern) = @_;
247              
248             die "Not connected" unless $self->connected;
249              
250             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
251              
252             die "Unknown queue: $queue" unless $self->_queue_exists($queue);
253              
254             die "Unknown exchange: $exchange" unless $self->_exchange_exists($exchange);
255              
256             my $binds = $self->bindings->{$exchange} || {};
257              
258             # Turn the pattern we're given into an actual regex
259             my $regex = $pattern;
260             if(($pattern =~ /\#/) || ($pattern =~ /\*/)) {
261             if($pattern =~ /\#/) {
262             $regex =~ s/\#/\.\*/g;
263             } elsif($pattern =~ /\*/) {
264             $regex =~ s/\*/\[^\.]\*/g;
265             }
266             $regex = '^'.$regex.'$';
267             $regex = qr($regex);
268             } else {
269             $regex = qr/^$pattern$/;
270             }
271              
272             # $self->_set_binding($routing_key, { queue => $queue, exchange => $exchange });
273             $binds->{$regex} = $queue;
274              
275             # In case these are new bindings
276             $self->bindings->{$exchange} = $binds;
277             }
278              
279              
280             sub queue_declare {
281             my ($self, $channel, $queue, $options) = @_;
282              
283             die "Not connected" unless $self->connected;
284              
285             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
286              
287             $self->_set_queue($queue, []);
288             }
289              
290              
291             sub queue_unbind {
292             my ($self, $channel, $queue, $exchange, $routing_key) = @_;
293              
294             die "Not connected" unless $self->connected;
295              
296             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
297              
298             die "Unknown queue: $queue" unless $self->_queue_exists($queue);
299              
300             die "Unknown exchange: $queue" unless $self->_exchange_exists($exchange);
301              
302             die "Unknown routing: $routing_key" unless $self->_binding_exists($routing_key);
303              
304             $self->_remove_binding($routing_key);
305             }
306              
307              
308             sub publish {
309             my $self = shift;
310             my $channel = shift;
311              
312             my $messages = $self->_tx_messages->{ $channel };
313             if ($messages) {
314             push @$messages, [ @_ ];
315             return;
316             }
317              
318             $self->_publish( $channel, @_ );
319             }
320              
321             sub _publish {
322             my ($self, $channel, $routing_key, $body, $options, $props) = @_;
323              
324             die "Not connected" unless $self->connected;
325              
326             die "Unknown channel: $channel" unless $self->_channel_exists($channel);
327              
328             my $exchange = $options->{exchange};
329             unless($exchange) {
330             $exchange = 'amq.direct';
331             }
332              
333             die "Unknown exchange: $exchange" unless $self->_exchange_exists($exchange);
334              
335             # Get the bindings for the specified exchange and test each key to see
336             # if our routing key matches. If it does, push it into the queue
337             my $binds = $self->bindings->{$exchange};
338             foreach my $pattern (keys %{ $binds }) {
339             if($routing_key =~ $pattern) {
340             print STDERR "Publishing '$routing_key' to ".$binds->{$pattern}."\n" if $self->debug;
341             my $message = {
342             body => $body,
343             routing_key => $routing_key,
344             exchange => $exchange,
345             props => $props || {},
346             };
347             push(@{ $self->_get_queue($binds->{$pattern}) }, $message);
348             }
349             }
350             }
351              
352              
353             sub recv {
354             my ($self) = @_;
355              
356             die "Not connected" unless $self->connected;
357              
358             my $queue = $self->queue;
359             die "No queue, did you consume() first?" unless defined($queue);
360              
361             my $message = shift(@{ $self->_get_queue($self->queue) });
362              
363             return undef unless defined $message;
364              
365             $message->{delivery_tag} = $self->_inc_delivery_tag;
366             $message->{consumer_tag} = '';
367              
368             return $message;
369             }
370              
371             sub _apply_defaults {
372             my ($self, $args, $defaults) = @_;
373              
374             $args ||= {};
375             my $new_args = {};
376              
377             foreach my $key (keys %$args) {
378             $new_args->{$key} = $args->{$key};
379             }
380              
381             foreach my $key (keys %$defaults) {
382             next if exists $new_args->{$key};
383             $new_args->{$key} = $defaults->{$key};
384             }
385              
386             return $new_args;
387             }
388              
389             1;
390              
391             __END__
392              
393             =pod
394              
395             =head1 NAME
396              
397             Test::Net::RabbitMQ - A mock RabbitMQ implementation for use when testing.
398              
399             =head1 VERSION
400              
401             version 0.10
402              
403             =head1 SYNOPSIS
404              
405             use Test::Net::RabbitMQ;
406              
407             my $mq = Test::Net::RabbitMQ->new;
408              
409             $mq->connect;
410              
411             $mq->channel_open(1);
412              
413             $mq->exchange_declare(1, 'order');
414             $mq->queue_declare(1, 'new-orders');
415              
416             $mq->queue_bind(1, 'new-orders', 'order', 'order.new');
417              
418             $mq->publish(1, 'order.new', 'hello!', { exchange => 'order' });
419              
420             $mq->consume(1, 'new-orders');
421              
422             my $msg = $mq->recv;
423            
424             # Or
425            
426             my $msg = $mq->get(1, 'order.new', {});
427              
428             =head1 DESCRIPTION
429              
430             Test::Net::RabbitMQ is a terrible approximation of using the real thing, but
431             hopefully will allow you to test systems that use L<Net::RabbitMQ> without
432             having to use an actual RabbitMQ instance.
433              
434             The general overview is that calls to C<publish> pushes a message into one
435             or more queues (or none if there are no bindings) and calls to C<recv>
436             pop them.
437              
438             =head1 CAVEATS
439              
440             This module has all the features I've needed to successfully test our
441             RabbitMQ-using application. Patches are welcome if I'm missing something you
442             need! At the moment there are a number of shortcomings:
443              
444             =over 4
445              
446             =item C<recv> doesn't block
447              
448             =item exchanges are all topic
449              
450             =item lots of other stuff!
451              
452             =back
453              
454             =head1 ATTRIBUTES
455              
456             =head2 connectable
457              
458             If false then any calls to connect will die to emulate a failed connection.
459              
460             =head2 debug
461              
462             If set to true (which you can do at any time) then a message will be emmitted
463             to STDERR any time a message is added to a queue.
464              
465             =head1 METHODS
466              
467             =head2 channel_close($number)
468              
469             Closes the specific channel.
470              
471             =head2 channel_open($number)
472              
473             Opens a channel with the specific number.
474              
475             =head2 connect
476              
477             Connects this instance. Does nothing except set C<connected> to true. Will
478             throw an exception if you've set C<connectable> to false.
479              
480             =head2 consume($channel, $queue)
481              
482             Sets the queue that will be popped when C<recv> is called.
483              
484             =head2 disconnect
485              
486             Disconnects this instance by setting C<connected> to false.
487              
488             =head2 exchange_declare($channel, $exchange, $options)
489              
490             Creates an exchange of the specified name.
491              
492             =head2 tx_select($channel)
493              
494             Begins a transaction on the specified channel. From this point forward all
495             publish() calls on the channel will be buffered until a call to L</tx_commit>
496             or L</tx_rollback> is made.
497              
498             =head2 tx_commit($channel)
499              
500             Commits a transaction on the specified channel, causing all buffered publish()
501             calls to this point to be published.
502              
503             =head2 tx_rollback($channel)
504              
505             Rolls the transaction back, causing all buffered publish() calls to be wiped.
506              
507             =head2 get ($channel, $queue, $options)
508              
509             Get a message from the queue, if there is one.
510              
511             Like C<Net::RabbitMQ>, this will return a hash containing the following
512             information:
513              
514             {
515             body => 'Magic Transient Payload', # the reconstructed body
516             routing_key => 'nr_test_q', # route the message took
517             exchange => 'nr_test_x', # exchange used
518             delivery_tag => 1, # (inc'd every recv or get)
519             redelivered => 0, # always 0
520             message_count => 0, # always 0
521             }
522              
523             =head2 queue_bind($channel, $queue, $exchange, $routing_key)
524              
525             Binds the specified queue to the specified exchange using the provided
526             routing key. B<Note that, at the moment, this doesn't work with AMQP wildcards.
527             Only with exact matches of the routing key.>
528              
529             =head2 queue_declare($channel, $queue, $options)
530              
531             Creates a queue of the specified name.
532              
533             =head2 queue_unbind($channel, $queue, $exchange, $routing_key)
534              
535             Unbinds the specified routing key from the provided queue and exchange.
536              
537             =head2 publish($channel, $routing_key, $body, $options)
538              
539             Publishes the specified body with the supplied routing key. If there is a
540             binding that matches then the message will be added to the appropriate queue(s).
541              
542             =head2 recv
543              
544             Provided you've called C<consume> then calls to recv will C<pop> the next
545             message of the queue. B<Note that this method does not block.>
546              
547             Like C<Net::RabbitMQ>, this will return a hash containing the following
548             information:
549              
550             {
551             body => 'Magic Transient Payload', # the reconstructed body
552             routing_key => 'nr_test_q', # route the message took
553             exchange => 'nr_test_x', # exchange used
554             delivery_tag => 1, # (inc'd every recv or get)
555             consumer_tag => '', # Always blank currently
556             props => $props, # hashref sent in
557             }
558              
559             =head1 AUTHOR
560              
561             Cory G Watson <gphat@cpan.org>
562              
563             =head1 COPYRIGHT AND LICENSE
564              
565             This software is copyright (c) 2014 by Cory G Watson.
566              
567             This is free software; you can redistribute it and/or modify it under
568             the same terms as the Perl 5 programming language system itself.
569              
570             =cut