File Coverage

blib/lib/RabbitMQ/Consumer/Batcher.pm
Criterion Covered Total %
statement 43 44 97.7
branch 6 8 75.0
condition n/a
subroutine 11 11 100.0
pod 4 4 100.0
total 64 67 95.5


line stmt bran cond sub pod time code
1             package RabbitMQ::Consumer::Batcher;
2 3     3   2778 use Moose;
  3         1315820  
  3         21  
3              
4 3     3   27319 use namespace::autoclean;
  3         22480  
  3         25  
5              
6 3     3   183 use Try::Tiny;
  3         11  
  3         201  
7 3     3   1220 use RabbitMQ::Consumer::Batcher::Item;
  3         15  
  3         117  
8 3     3   1554 use RabbitMQ::Consumer::Batcher::Message;
  3         13  
  3         1906  
9              
10             our $VERSION = '0.1.1';
11              
12             =head1 NAME
13              
14             RabbitMQ::Consumer::Batcher - batch consumer of RMQ messages
15              
16             =head1 SYNOPSIS
17              
18             use AnyEvent;
19             use AnyEvent::RabbitMQ::PubSub;
20             use AnyEvent::RabbitMQ::PubSub::Consumer;
21             use RabbitMQ::Consumer::Batcher;
22              
23             my ($rmq_connection, $channel) = AnyEvent::RabbitMQ::PubSub::connect(
24             host => 'localhost',
25             port => 5672,
26             user => 'guest',
27             pass => 'guest',
28             vhost => '/',
29             );
30              
31             my $exchange = {
32             exchange => 'my_test_exchange',
33             type => 'topic',
34             durable => 0,
35             auto_delete => 1,
36             };
37              
38             my $queue = {
39             queue => 'my_test_queue';
40             auto_delete => 1,
41             };
42              
43             my $routing_key = 'my_rk';
44              
45             my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
46             channel => $channel,
47             exchange => $exchange,
48             queue => $queue,
49             routing_key => $routing_key,
50             );
51             $consumer->init(); #declares channel, queue and binding
52              
53             my $batcher = RabbitMQ::Consumer::Batcher->new(
54             batch_size => $consumer->prefetch_count,
55             on_add => sub {
56             my ($batcher, $msg) = @_;
57              
58             my $decode_payload = decode_payload($msg->{header}, $msg->{body}->payload());
59             return $decode_payload;
60             },
61             on_add_catch => sub {
62             my ($batcher, $msg, $exception) = @_;
63              
64             if ($exception->$_isa('failure') && $exception->{payload}{stats_key}) {
65             $stats->increment($exception->{payload}{stats_key});
66             }
67              
68             if ($exception->$_isa('failure') && $exception->{payload}{reject}) {
69             $batcher->reject($msg);
70             $log->error("consume failed - reject: $exception\n".$msg->{body}->payload());
71             }
72             else {
73             $batcher->reject_and_republish($msg);
74             $log->error("consume failed - republish: $exception");
75             }
76             },
77             on_batch_complete => sub {
78             my ($batcher, $batch) = @_;
79              
80             path(...)->spew(join "\t", map { $_->value() } @$batch);
81             },
82             on_batch_complete_catch => sub {
83             my ($batcher, $batch, $exception) = @_;
84              
85             $log->error("save messages to file failed: $exception");
86             }
87             );
88              
89             my $cv = AnyEvent->condvar();
90             $consumer->consume($cv, $batcher->consume_code())->then(sub {
91             say 'Consumer was started...';
92             });
93              
94             =head1 DESCRIPTION
95              
96             If you need batch of messages from RabbitMQ - this module is for you.
97              
98             This module work well with L<AnyEvent::RabbitMQ::PubSub::Consumer>
99              
100             Idea of this module is - in I<on_add> phase is message validate and if is corrupted, can be reject.
101             In I<on_batch_complete> phase we manipulated with message which we don't miss.
102             If is some problem in this phase, messages are republished..
103              
104             =head1 METHODS
105              
106             =head2 new(%attributes)
107              
108             =head3 attributes
109              
110             =head4 batch_size
111              
112             Max batch size (trigger for C<on_batch_complete>)
113              
114             C<batch_size> must be C<prefetch_count> or bigger!
115              
116             this is required attribute
117              
118             =cut
119              
120             has 'batch_size' => (
121             is => 'ro',
122             isa => 'Int',
123             required => 1,
124             );
125              
126             =head4 on_add
127              
128             this callback are called after consume one single message. Is usefully for decoding for example.
129              
130             return value of callback are used as value in batch item (L<RabbitMQ::Consumer::Batcher::Item>)
131              
132             default behaviour is payload of message is used as item in batch
133              
134             return sub {
135             my($batcher, $msg) = @_;
136             return $msg->{body}->payload()
137             }
138              
139             parameters which are give to callback:
140              
141             =over
142              
143             =item C<$batcher>
144              
145             self instance of L<RabbitMQ::Consumer::Batcher>
146              
147             =item C<$msg>
148              
149             consumed message L<AnyEvent::RabbitMQ::Channel/on_consume>
150              
151             =back
152              
153             =cut
154              
155             has 'on_add' => (
156             is => 'ro',
157             isa => 'CodeRef',
158             default => sub {
159             return sub {
160             my ($batcher, $msg) = @_;
161             return $msg->{body}->payload();
162             }
163             }
164             );
165              
166             =head4 on_add_catch
167              
168             this callback are called if C<on_add> callback throws
169              
170             default behaviour do reject message
171              
172             return sub {
173             my ($batcher, $msg, $exception) = @_;
174              
175             $batcher->reject($msg);
176             }
177              
178             parameters which are give to callback:
179              
180             =over
181              
182             =item C<$batcher>
183              
184             self instance of L<RabbitMQ::Consumer::Batcher>
185              
186             =item C<$msg>
187              
188             consumed message L<AnyEvent::RabbitMQ::Channel/on_consume>
189              
190             =item C<$exception>
191              
192             exception string
193              
194             =back
195              
196             =cut
197              
198             has 'on_add_catch' => (
199             is => 'ro',
200             isa => 'CodeRef',
201             default => sub {
202             return sub {
203             my ($batcher, $msg, $exception) = @_;
204              
205             $batcher->reject($msg);
206             }
207             }
208             );
209              
210             =head4 on_batch_complete
211              
212             this callback is triggered if batch is complete (count of items is C<batch_size>)
213              
214             this is required attribute
215              
216             parameters which are give to callback:
217              
218              
219             =over
220              
221             =item C<$batcher>
222              
223             self instance of L<RabbitMQ::Consumer::Batcher>
224              
225             =item C<$batch>
226              
227             batch is I<ArrayRef> of L<RabbitMQ::Consumer::Batcher::Item>
228              
229             =back
230              
231             example C<on_batch_complete> I<CodeRef> (item I<value> are I<string>s)
232              
233             return sub {
234             my($batcher, $batch) = @_;
235              
236             print join "\n", map { $_->value() } @$batch;
237             $batcher->ack($batch);
238             }
239              
240             =cut
241              
242             has 'on_batch_complete' => (
243             is => 'ro',
244             isa => 'CodeRef',
245             required => 1,
246             );
247              
248             =head4 on_batch_complete_catch
249              
250             this callback are called if C<on_batch_complete> callback throws
251              
252             after this callback is batch I<reject_and_republish>
253              
254             If you need change I<reject_and_republish> of batch to (for example) I<reject>, you can do:
255              
256             return sub {
257             my ($batcher, $batch, $exception) = @_;
258              
259             $batcher->reject($batch);
260             #batch_clean must be called,
261             #because reject_and_republish after this exception handler will be called to...
262             $batcher->batch_clean();
263             }
264              
265             parameters which are give to callback:
266              
267             =over
268              
269             =item C<$batcher>
270              
271             self instance of L<RabbitMQ::Consumer::Batcher>
272              
273             =item C<$batch>
274              
275             I<ArrayRef> of L<RabbitMQ::Consumer::Batcher::Item>s
276              
277             =item C<$exception>
278              
279             exception string
280              
281             =back
282              
283             =cut
284              
285             has 'on_batch_complete_catch' => (
286             is => 'ro',
287             isa => 'Maybe[CodeRef]',
288             );
289              
290             has 'batch' => (
291             is => 'ro',
292             isa => 'ArrayRef[RabbitMQ::Consumer::Batcher::Item]',
293             default => sub { [] },
294             traits => ['Array'],
295             handles => {
296             add_to_batch => 'push',
297             clean_batch => 'clear',
298             count_of_batch_items => 'count',
299             batch_as_array => 'elements',
300             }
301             );
302              
303             =head2 consume_code()
304              
305             return C<sub{}> for handling messages in C<consume> method of L<AnyEvent::RabbitMQ::PubSub::Consumer>
306              
307             $consumer->consume($cv, $batcher->consume_code());
308              
309             =cut
310              
311             sub consume_code {
312 2     2 1 1314 my ($self) = @_;
313              
314             return sub {
315 12     12   3028 my ($consumer, $msg) = @_;
316              
317 12         451 my $message = RabbitMQ::Consumer::Batcher::Message->new(
318             %$msg,
319             consumer => $consumer,
320             );
321              
322             try {
323              
324 12         965 my $value = $self->on_add->($self, $message);
325              
326 7         775 $self->add_to_batch(
327             RabbitMQ::Consumer::Batcher::Item->new(
328             value => $value,
329             msg => $message,
330             )
331             );
332             }
333             catch {
334 5         543 $self->on_add_catch->($self, $message, $_);
335 12         99 };
336              
337 12 100       3454 if ($self->count_of_batch_items() >= $self->batch_size) {
338             try {
339 2         247 $self->on_batch_complete->($self, $self->batch);
340 1         706 $self->ack($self->batch_as_array());
341             }
342             catch {
343 1         20 my $exception = $_;
344 1 50       37 if (defined $self->on_batch_complete_catch) {
345 1         26 $self->on_batch_complete_catch->($self, $self->batch, $exception);
346             }
347 1         562 $self->reject_and_republish($self->batch_as_array());
348             }
349             finally {
350 2         1056 $self->clean_batch();
351 2         23 };
352             }
353             }
354 2         19 }
355              
356             =head2 ack(@items)
357              
358             ack all C<@items> (instances of L<RabbitMQ::Consumer::Batcher::Item> or L<RabbitMQ::Consumer::Batcher::Message>)
359              
360             =cut
361              
362             sub ack {
363 1     1 1 4 my ($self, @items) = @_;
364              
365 1         3 _consumer('ack', @items);
366             }
367              
368             =head2 reject(@items)
369              
370             reject all C<@items> (instances of L<RabbitMQ::Consumer::Batcher::Item> or L<RabbitMQ::Consumer::Batcher::Message>)
371              
372             =cut
373              
374             sub reject {
375 5     5 1 14 my ($self, @items) = @_;
376              
377 5         26 _consumer('reject', @items);
378             }
379              
380             =head2 reject_and_republish(@items)
381              
382             reject and republish all C<@items> (instances of L<RabbitMQ::Consumer::Batcher::Item> or L<RabbitMQ::Consumer::Batcher::Message>)
383              
384             =cut
385              
386             sub reject_and_republish {
387 1     1 1 3 my ($self, @items) = @_;
388              
389 1         4 _consumer('reject_and_republish', @items);
390             }
391              
392             sub _consumer {
393 7     7   18 my ($action, @items) = @_;
394              
395 7         21 foreach my $item (@items) {
396 12 100       2578 if ($item->isa('RabbitMQ::Consumer::Batcher::Item')) {
    50          
397 7         223 $item->msg->consumer->$action($item->msg);
398             }
399             elsif ($item->isa('RabbitMQ::Consumer::Batcher::Message')) {
400 5         144 $item->consumer->$action($item);
401             }
402             else {
403 0           die 'Unknown object (without consumer)';
404             }
405             }
406              
407             }
408              
409             =head1 contributing
410              
411             for dependency use L<cpanfile>...
412              
413             for resolve dependency use L<Carton> (or L<Carmel> - is more experimental)
414              
415             carton install
416              
417             for run test use C<minil test>
418              
419             carton exec minil test
420              
421              
422             if you don't have perl environment, is best way use docker
423              
424             docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton install
425             docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton exec minil test
426              
427             =head2 warning
428              
429             docker run default as root, all files which will be make in docker will be have root rights
430              
431             one solution is change rights in docker
432              
433             docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended bash -c "carton install; chmod -R 0777 ."
434              
435             or after docker command (but you must have root rights)
436              
437             =head1 LICENSE
438              
439             Copyright (C) Avast Software.
440              
441             This library is free software; you can redistribute it and/or modify
442             it under the same terms as Perl itself.
443              
444             =head1 AUTHOR
445              
446             Jan Seidl E<lt>seidl@avast.comE<gt>
447              
448             =cut
449              
450             __PACKAGE__->meta->make_immutable();
451              
452             1;