File Coverage

blib/lib/Net/AMQP/RabbitMQ/Batch.pm
Criterion Covered Total %
statement 24 105 22.8
branch 0 28 0.0
condition 0 20 0.0
subroutine 8 19 42.1
pod 1 2 50.0
total 33 174 18.9


line stmt bran cond sub pod time code
1             package Net::AMQP::RabbitMQ::Batch;
2              
3 1     1   43096 use strict;
  1         3  
  1         25  
4 1     1   5 use warnings;
  1         2  
  1         36  
5              
6             our $VERSION = '0.2301';
7              
8 1     1   4 use Carp qw(carp croak cluck confess);
  1         2  
  1         44  
9 1     1   307 use Carp::Assert;
  1         955  
  1         8  
10 1     1   422 use Try::Tiny;
  1         1428  
  1         44  
11 1     1   343 use Net::AMQP::RabbitMQ;
  1         5447  
  1         30  
12 1     1   285 use Time::HiRes qw(time);
  1         867  
  1         3  
13 1     1   539 use Data::Dumper;
  1         4855  
  1         801  
14              
15             =encoding UTF-8
16              
17             =head1 NAME
18              
19             Net::AMQP::RabbitMQ::Batch - simple batch processing of messages for RabbitMQ.
20              
21             =head1 SYNOPSIS
22              
23             my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;
24             $rb->process({
25             from_queue => 'test_in',
26             routing_key => 'test_out',
27             handler => \&msg_handler,
28             batch => {
29             size => 10, # batch size
30             timeout => 2, #
31             ignore_size => 0 # ignore in/out batches size mismatch
32             },
33             ignore_errors => 0, # ignore handler errors
34             publish_options => {
35             exchange => 'exchange_out', # exchange name, default is 'amq.direct'
36             },
37             });
38              
39             sub msg_handler {
40             my $messages = shift;
41             # work with 10 messages
42             return $messages;
43             }
44              
45             =head1 DESCRIPTION
46              
47             Assume you read messages from a queue, process them and publish. But you would like to do it in batches, processing many messages at once.
48              
49             This module:
50              
51             =over
52              
53             =item *
54             gets messages from in queue and publish them by routing key
55              
56             =item *
57             uses your handler to batch process messages
58              
59             =item *
60             keeps persistency - if processing fails, nothing lost from input queue, nothing published
61              
62             =back
63              
64             =head1 USAGE
65              
66             Define a messages handler:
67              
68             sub msg_handler {
69             my $messages = shift;
70             # works with hashref of messages
71             return $messages;
72             }
73              
74             C<$messages> is an arrayref of message objects:
75              
76             {
77             body => 'Magic Transient Payload', # the reconstructed body
78             routing_key => 'nr_test_q', # route the message took
79             delivery_tag => 1, # (used for acks)
80             ....
81             # Not all of these will be present. Consult the RabbitMQ reference for more details.
82             props => { ... }
83             }
84              
85             Handler should return arrayref of message objects (only C is required):
86              
87             [
88             { body => 'Processed message' },
89             ...
90             ]
91              
92             Connect to RabbitMQ:
93              
94             my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;
95              
96             And process a batch:
97              
98             $rb->process({
99             from_queue => 'test_in',
100             routing_key => 'test_out',
101             handler => \&msg_handler,
102             batch => { size => 10 }
103             });
104              
105             You might like to wrap it with C loop. See F or F for example.
106              
107             =head1 METHODS
108              
109             =head2 process()
110              
111             =head1 Known Issues
112              
113             =over
114              
115             =item *
116             Can not set infinity timeout (use very long int)
117              
118             =item *
119             No individual messages processing possible
120              
121             =item *
122             No tests yet which is very sad :(
123              
124             =back
125              
126             =head1 AUTHORS
127              
128             Alex Svetkin
129              
130             =head1 LICENSE
131              
132             MIT
133              
134             =cut
135              
136             sub new {
137 0     0 0   my ($class, $rabbit_hostname, $rabbit_options) = @_;
138 0 0         croak('No hostname given') unless $rabbit_hostname;
139 0 0         croak('No connection options given') unless $rabbit_options;
140              
141 0           return bless {
142             mq => $class->_get_mq($rabbit_hostname, $rabbit_options),
143             }, $class;
144             }
145              
146             sub _get_mq {
147 0     0     my ($class, $rabbit_hostname, $rabbit_options) = @_;
148 0           my $mq = Net::AMQP::RabbitMQ->new();
149 0 0         $mq->connect($rabbit_hostname, $rabbit_options) or croak;
150 0           return $mq;
151             }
152              
153             sub process {
154 0     0 1   my ($self, $options) = @_;
155 0   0       my $channel_id = $options->{channel_id} || int(rand(65535)) + 1;
156 0 0         my $from_queue = $options->{from_queue} or croak('No from_queue given');
157 0 0 0       if (defined($options->{publish_options}) && !defined($options->{routing_key})) {
158 0           croak('publish_options set but not routing_key defined!');
159             }
160 0 0         my $publish = defined($options->{routing_key}) ? 1 : 0;
161 0           my $routing_key = $options->{routing_key};
162 0 0         my $handler = $options->{handler} or croak('No handler given');
163 0   0       my $ignore_errors = $options->{ignore_errors} || 0;
164              
165 0           my $success = 1;
166              
167             try {
168 0     0     $self->{mq}->channel_open($channel_id);
169 0           my $messages = $self->_get($channel_id, $from_queue, {no_ack => 0}, $options->{batch});
170 0           my $processed_messages = undef;
171             try {
172 0           $processed_messages = &$handler($messages);
173             } catch {
174 0 0         if ($ignore_errors) {
175 0           cluck("Batch handler error: $_");
176 0           $success = 0;
177             } else {
178 0           confess("Batch handler error: $_");
179             }
180 0           };
181 0 0 0       if ($success && $self->_check_messages($messages, $processed_messages, $options->{batch})) {
182 0 0         if ($publish) {
183             $self->_publish($processed_messages, $channel_id, $routing_key,
184 0           $options->{publish_options}, $options->{publish_props});
185             }
186 0           $self->_ack_messages($messages, $channel_id);
187             } else {
188 0           $success = 0;
189             }
190             } catch {
191 0     0     croak("Error: $_");
192             } finally {
193 0     0     $self->{mq}->channel_close($channel_id);
194 0           };
195              
196 0           return $success;
197             }
198              
199             sub _get {
200 0     0     my ($self, $channel_id, $queue, $mq_opts, $opts) = @_;
201 0           assert($channel_id);
202 0           assert($queue);
203 0   0       $opts->{size} ||= 10;
204 0   0       $opts->{timeout} ||= 30;
205 0   0       $opts->{sleep} ||= 1;
206              
207 0           my $batch_activity_ts = time();
208 0           my $messages = [];
209 0           while (scalar(@$messages) < $opts->{size}) {
210 0           my $msg = $self->{mq}->get($channel_id, $queue, $mq_opts);
211 0 0         if ($msg) {
212 0           $batch_activity_ts = time();
213 0           push(@$messages, $msg);
214             } else {
215 0 0         if (time() - $batch_activity_ts > $opts->{timeout}) {
216 0           last;
217             } else {
218 0           sleep($opts->{sleep});
219             }
220             }
221             }
222 0           return $messages;
223             }
224              
225             sub _publish {
226 0     0     my ($self, $messages, $channel_id, $queue, $mq_options, $mq_props) = @_;
227 0           assert(ref($messages) eq 'ARRAY');
228 0           assert($channel_id);
229 0           assert($queue);
230              
231 0           foreach my $msg (@$messages) {
232 0           assert($msg->{body});
233 0           $self->{mq}->publish($channel_id, $queue, $msg->{body}, $mq_options, $mq_props);
234             }
235 0           return;
236             }
237              
238             sub _ack_messages {
239 0     0     my ($self, $messages, $channel_id) = @_;
240 0           assert(ref($messages) eq 'ARRAY');
241 0           assert($channel_id);
242              
243 0           foreach my $msg (@$messages) {
244 0           assert($msg->{delivery_tag});
245 0           $self->{mq}->ack($channel_id, $msg->{delivery_tag});
246             }
247 0           return;
248             }
249              
250             sub _check_messages {
251 0     0     my ($self, $messages, $processed_messages, $options) = @_;
252 0           assert(ref($messages) eq 'ARRAY');
253 0           assert(ref($options) eq 'HASH');
254              
255 0 0         if (ref($processed_messages) ne 'ARRAY') {
256 0           carp('Invalid handler output (expected ARRAYREF)');
257 0           return 0;
258             }
259 0 0 0       if (!$options->{ignore_size} && scalar(@$messages) != scalar(@$processed_messages)) {
260 0           carp(sprintf('Numbers of incoming and processed messages do not match (expected %d, got %d). '
261             . 'Discarding this batch',
262             scalar(@$messages), scalar(@$processed_messages)));
263 0           return 0;
264             }
265 0           return 1;
266             }
267              
268             sub DESTROY {
269 0     0     my $self = shift;
270 0           $self->{mq}->disconnect();
271 0           return;
272             }
273              
274             1;