File Coverage

blib/lib/Crixa/Queue.pm
Criterion Covered Total %
statement 15 64 23.4
branch 0 30 0.0
condition 0 7 0.0
subroutine 5 16 31.2
pod 7 8 87.5
total 27 125 21.6


line stmt bran cond sub pod time code
1             package Crixa::Queue;
2              
3 1     1   4 use strict;
  1         1  
  1         32  
4 1     1   3 use warnings;
  1         2  
  1         21  
5 1     1   4 use namespace::autoclean;
  1         1  
  1         6  
6              
7             our $VERSION = '0.13';
8              
9 1     1   68 use Moose;
  1         2  
  1         5  
10              
11 1     1   5203 use Crixa::Message;
  1         3  
  1         761  
12              
13             with qw(Crixa::HasMQ);
14              
15             has name => (
16             isa => 'Str',
17             reader => 'name',
18             writer => '_name',
19             predicate => '_has_name',
20             );
21              
22             has channel => (
23             isa => 'Crixa::Channel',
24             is => 'ro',
25             required => 1,
26             );
27              
28             has passive => (
29             is => 'ro',
30             isa => 'Bool',
31             default => 0,
32             );
33              
34             has durable => (
35             is => 'ro',
36             isa => 'Bool',
37             default => 0,
38             );
39              
40             has exclusive => (
41             is => 'ro',
42             isa => 'Bool',
43             default => 0,
44             );
45              
46             has auto_delete => (
47             is => 'ro',
48             isa => 'Bool',
49             default => 0,
50             );
51              
52             sub BUILD {
53 0     0 0   my $self = shift;
54              
55 0           my $name = $self->_queue_declare;
56 0 0         return if $self->_has_name;
57 0           $self->_name($name);
58             }
59              
60             sub _queue_declare {
61 0     0     my $self = shift;
62 0           my $passive = shift;
63              
64 0           my $props = $self->_props;
65 0 0         $props->{passive} = 1 if $passive;
66              
67 0   0       return $self->_mq->queue_declare(
68             $self->channel->id,
69             $self->name // q{},
70             $props,
71             );
72             }
73              
74             sub message_count {
75 0     0 1   my $self = shift;
76              
77 0           my ( undef, $message_count, undef ) = $self->_queue_declare('passive');
78              
79 0           return $message_count;
80             }
81              
82             sub check_for_message {
83 0     0 1   my $self = shift;
84 0 0         my $args = @_ > 1 ? {@_} : ref $_[0] ? $_[0] : {};
    0          
85              
86 0           return $self->_inflate_message(
87             $self->_mq->get( $self->channel->id, $self->name, $args ) );
88             }
89              
90             sub _inflate_message {
91 0     0     my $self = shift;
92 0           my $msg = shift;
93              
94 0 0         return unless defined $msg;
95              
96 0           return Crixa::Message->new( %$msg, channel => $self->channel );
97             }
98              
99             sub wait_for_message {
100 0     0 1   my $self = shift;
101              
102 0           my $msg;
103 0           do { $msg = $self->check_for_message(@_); } until ( defined $msg );
  0            
104 0           return $msg;
105             }
106              
107             sub handle_message {
108 0     0 1   my $self = shift;
109 0           my $handler = shift;
110 0 0         my $args = @_ > 1 ? {@_} : ref $_[0] ? $_[0] : {};
    0          
111              
112 0           my $msg = $self->wait_for_message($args);
113 0           for ($msg) { return $handler->($msg) }
  0            
114 0           confess 'Something unusual happened.';
115             }
116              
117             ## no critic (Subroutines::ProhibitBuiltinHomonyms)
118             sub bind {
119 0     0 1   my $self = shift;
120 0 0         my $args = @_ > 1 ? {@_} : ref $_[0] ? $_[0] : {};
    0          
121              
122 0   0       $self->_mq->queue_bind(
      0        
123             $self->channel->id,
124             $self->name,
125             $args->{exchange},
126             $args->{routing_key} // $self->name,
127             $args->{headers} // {},
128             );
129             }
130              
131             sub delete {
132 0     0 1   my $self = shift;
133 0 0         my $args = @_ > 1 ? {@_} : ref $_[0] ? $_[0] : {};
    0          
134              
135 0           $self->_mq->queue_delete( $self->channel->id, $self->name, $args );
136             }
137             ## use critic
138              
139             sub consume {
140 0     0 1   my $self = shift;
141 0           my $cb = shift;
142 0 0         my $args = @_ > 1 ? {@_} : ref $_[0] ? $_[0] : {};
    0          
143              
144 0           my $timeout = delete $args->{timeout};
145              
146 0           my $tag = $self->_mq->consume( $self->channel->id, $self->name, $args );
147 0           while (1) {
148 0 0         my $raw = $self->_mq->recv( $timeout ? $timeout : () );
149 0 0         last unless $cb->( $self->_inflate_message($raw) );
150             }
151 0           $self->_mq->cancel( $self->channel->id, $tag );
152             }
153              
154             sub _props {
155 0     0     my $self = shift;
156              
157 0           return { map { $_ => $self->$_() }
  0            
158             qw( passive durable exclusive auto_delete ) };
159             }
160              
161             __PACKAGE__->meta->make_immutable;
162              
163             1;
164              
165             # ABSTRACT: A Crixa Queue
166              
167             __END__
168              
169             =pod
170              
171             =head1 NAME
172              
173             Crixa::Queue - A Crixa Queue
174              
175             =head1 VERSION
176              
177             version 0.13
178              
179             =head1 DESCRIPTION
180              
181             This class represents a single queue. With RabbitMQ, messages are published to
182             exchanges, which then routes the message to one or more queues. You then
183             consume those messages from the queue.
184              
185             =encoding UTF-8
186              
187             =head1 METHODS
188              
189             This class provides the following methods:
190              
191             =head2 Crixa::Queue->new(...)
192              
193             This method creates a new queue object. You should not call this method
194             directly under normal circumstances. Instead, you should create a queue by
195             calling the C<queue> method on a L<Crixa::Channel> or L<Crixa::Exchange>
196             object. However, you need to know what parameters the constructor accepts.
197              
198             =over 4
199              
200             =item * name
201              
202             The name of the queue. If none is provided then RabbitMQ will auto-generate a
203             name for you.
204              
205             =item * passive => $bool
206              
207             If this is true, then the constructor will throw an error B<unless the queue
208             already exists>.
209              
210             This defaults to false.
211              
212             =item * durable => $bool
213              
214             If this is true, then the queue will remain active across server
215             restarts.
216              
217             This defaults to false.
218              
219             =item * auto_delete => $bool
220              
221             If this is true, then the queue will be deleted when there are no more
222             consumers subscribed to it. The queue initially exists until at least one
223             consumer subscribes.
224              
225             This defaults to false.
226              
227             =item * exclusive => $bool
228              
229             If this is true, then the queue is only accessible via the current connection
230             and will be deleted when that connection closes.
231              
232             This defaults to false.
233              
234             =back
235              
236             =head2 $queue->check_for_message(...)
237              
238             This checks the queue for a message. This method does not block. It returns
239             C<undef> if there is no message ready. It accepts either a hash or
240             hashref with the following keys:
241              
242             =over 4
243              
244             =item no_ack => $bool
245              
246             If this is true, then the message is not acknowledged as it is taken from the
247             queue. You will need to explicitly acknowledge it using the C<ack> method on
248             the L<Crixa::Channel> object from which the message came.
249              
250             If this is false, then the message is acknowledged immediately. Calling the
251             C<ack> method later with this message's delivery tag will be an error.
252              
253             This defaults to true.
254              
255             =back
256              
257             =head2 $queue->consume($callback, ...)
258              
259             This method start consuming message via the AMQP consume API using the given
260             callback. Internally, this uses the C<poll()> system call to efficiently wait
261             for messages to come in. You are strongly encouraged to use this over the
262             C<wait_for_message()> methods and instead of calling C<check_for_message()> in
263             a loop.
264              
265             The callback you provide will be passed a single optional argument. This
266             argument is always a C<Crixa::Message> object. However, if you specified a
267             timeout (see below), then your callback may be called without any arguments at
268             all.
269              
270             The callback is expected to return true or false. If it returns true, Crixa
271             will continue waiting for new messages. If it returns false, it will cancel
272             the consumer and the C<consume()> method will return.
273              
274             Note that if you create an "auto-delete" queue, then it will be deleted after
275             the last consumer it cancelled.
276              
277             This method also accepts either a hash or hashref with the following keys
278             after the callback:
279              
280             =over 4
281              
282             =item * timeout => $integer
283              
284             This is an optional timeout for each internal call to the C<<
285             Net::AMQP::RabbitMQ->recv() >> method. If you specify this, then your callback
286             will be called without any arguments.
287              
288             =item * consumer_tag => $string
289              
290             A string identifying the consumer. If you don't provide one it will be
291             generated automatically. This will be available from the L<Crixa::Message>
292             object passed to your callback, regardless of whether it is auto-generated or
293             not.
294              
295             =item * no_local => $bool
296              
297             If this is true, then the server will not send messages to the same connection
298             as the one from which they were published.
299              
300             This defaults to false.
301              
302             =item * no_ack => $bool
303              
304             If this is true, then the message is not acknowledged as it is taken from the
305             queue. You will need to explicitly acknowledge it using the C<ack> method on
306             the L<Crixa::Channel> object from which the message came.
307              
308             If this is false, then the message is acknowledged immediately. Calling the
309             C<ack> method later with this message's delivery tag will be an error.
310              
311             This defaults to true.
312              
313             =item * exclusive => $bool
314              
315             If this is true, then only this consumer may access the queue. If another
316             consumer attempts to access the queue at the same time it will received an
317             error.
318              
319             This defaults to false.
320              
321             =back
322              
323             =head2 $queue->wait_for_message(...)
324              
325             This blocks until a message is ready. It always returns a single message.
326              
327             This takes the same parameters as the C<check_for_message> method.
328              
329             =head2 $queue->handle_message($callback, ...)
330              
331             This message takes a callback and blocks until the next message. It calls the
332             callback with the message as its only argument and returns whatever the
333             callback returns.
334              
335             This takes the same parameters as the C<check_for_message> method after the
336             callback.
337              
338             =head2 $queue->message_count
339              
340             Returns the number of messages waiting in the queue.
341              
342             =head2 $queue->bind(...)
343              
344             This binds a queue to an exchange. It accepts either a hash or hashref with
345             the following keys:
346              
347             =over 4
348              
349             =item * exchange
350              
351             The name of the exchange to which the queue will be bound. This is required.
352              
353             =item * routing_key
354              
355             An optional routing key for the binding. If none is given the queue name is
356             used instead.
357              
358             =item * headers
359              
360             An optional hashref used when binding to a headers matching exchange.
361              
362             This hashref should contain the headers against which the queue is
363             matching.
364              
365             You can also specify an C<x-match> key of either "any" or "all". If the value
366             is "any" then the queue will receive a message when any of the headers in the
367             message match those that the queue was bound with. if it is set to "all" then
368             all headers in the message must match the binding.
369              
370             =back
371              
372             =head2 $queue->delete(...)
373              
374             This deletes the queue. It accepts either a hash or hashref with the
375             following keys:
376              
377             =over 4
378              
379             =item * if_unused => $bool
380              
381             If this is true, then the queue is only deleted if it has no consumers. Given
382             the way that Crixa handles getting messages, this is irrelevant if you are
383             only using Crixa to communicate with the queue.
384              
385             This defaults to true.
386              
387             =item * if_empty => $bool
388              
389             If this is true, then the queue is only deleted if it is empty.
390              
391             This defaults to true.
392              
393             =back
394              
395             =head2 $queue->name
396              
397             Returns the queue name.
398              
399             =head2 $queue->channel
400              
401             Returns the L<Crixa::Channel> that this queue uses.
402              
403             =head2 $queue->passive
404              
405             This returns the passive flag as passed to the constructor or set by a
406             default.
407              
408             =head2 $queue->durable
409              
410             This returns the durable flag as passed to the constructor or set by a
411             default.
412              
413             =head2 $queue->auto_delete
414              
415             This returns the auto-delete flag as passed to the constructor or set by a
416             default.
417              
418             =head2 $queue->exclusive
419              
420             This returns the exclusive flag as passed to the constructor or set by a
421             default.
422              
423             =head1 AUTHORS
424              
425             =over 4
426              
427             =item *
428              
429             Chris Prather <chris@prather.org>
430              
431             =item *
432              
433             Dave Rolsky <autarch@urth.org>
434              
435             =back
436              
437             =head1 COPYRIGHT AND LICENSE
438              
439             This software is copyright (c) 2012 - 2015 by Chris Prather.
440              
441             This is free software; you can redistribute it and/or modify it under
442             the same terms as the Perl 5 programming language system itself.
443              
444             =cut