File Coverage

blib/lib/Kafka/Consumer/Avro.pm
Criterion Covered Total %
statement 32 68 47.0
branch 0 10 0.0
condition 0 9 0.0
subroutine 11 20 55.0
pod 4 4 100.0
total 47 111 42.3


line stmt bran cond sub pod time code
1             package Kafka::Consumer::Avro;
2              
3             =pod
4              
5             =head1 NAME
6              
7             Kafka::Consumer::Avro - Avro message consumer for Apache Kafka.
8              
9             =head1 SYNOPSIS
10              
11             use Kafka qw/DEFAULT_MAX_BYTES/;
12             use Kafka::Connection;
13             use Kafka::Consumer::Avro;
14             use Confluent::SchemaRegistry;
15            
16             my $connection = Kafka::Connection->new( host => 'localhost' );
17            
18             my $consumer = Kafka::Consumer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() );
19            
20             # Consuming messages
21             my $messages = $consumer->fetch(
22             'mytopic', # topic
23             0, # partition
24             0, # offset
25             $DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive
26             );
27            
28             if ($messages) {
29             foreach my $message (@$messages) {
30             if ( $message->valid ) {
31             say 'payload : ', $message->payload;
32             say 'key : ', $message->key;
33             say 'offset : ', $message->offset;
34             say 'next_offset: ', $message->next_offset;
35             }
36             else {
37             say 'error : ', $message->error;
38             }
39             }
40             }
41            
42             # Closes the consumer and cleans up
43             undef $consumer;
44             $connection->close;
45             undef $connection;
46              
47             =head1 DESCRIPTION
48              
49             C<Kafka::Consumer::Avro> main feature is to provide object-oriented API to
50             consume messages according to I<Confluent SchemaRegistry> and I<Avro> serialization.
51              
52             C<Kafka::Consumer::Avro> inerhits from and extends L<Kafka::Consumer|Kafka::Consumer>.
53              
54             =cut
55              
56 1     1   1169386 use 5.010;
  1         5  
57 1     1   6 use strict;
  1         2  
  1         20  
58 1     1   5 use warnings;
  1         2  
  1         23  
59              
60 1     1   5 use JSON::XS;
  1         2  
  1         60  
61 1     1   523 use IO::String;
  1         3866  
  1         34  
62              
63 1     1   22 use base 'Kafka::Consumer';
  1         3  
  1         623  
64              
65 1     1   4459 use Avro::BinaryDecoder;
  1         2324  
  1         34  
66 1     1   7 use Avro::Schema;
  1         2  
  1         19  
67 1     1   4 use Confluent::SchemaRegistry;
  1         3  
  1         21  
68              
69 1     1   9 use constant MAGIC_BYTE => 0;
  1         2  
  1         71  
70              
71 1     1   6 use version; our $VERSION = version->declare('v1.0.0');
  1         3  
  1         7  
72              
73             =head1 INSTALL
74              
75             Installation of C<Kafka::Consumer::Avro> is a canonical:
76              
77             perl Makefile.PL
78             make
79             make test
80             make install
81              
82             =head2 TEST NOTES
83              
84             Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend C<Kafka::Consumer> test suite.
85              
86             They expect that in the target machine are available Kafka and Schema Registry listening on C<localhost> and default ports, otherwise most of the test are skipped.
87              
88             =head1 USAGE
89              
90             =head2 CONSTRUCTOR
91              
92             =head3 C<new>
93              
94             Creates new consumer client object.
95              
96             C<new()> takes arguments in key-value pairs as described in L<Kafka::Consumer|Kafka::Consumer> from which it inherits.
97              
98             In addition, takes in the following arguments:
99              
100             =over 3
101              
102             =item C<SchemaRegistry =E<gt> $schema_registry> (B<mandatory>)
103              
104             Is a L<Confluent::SchemaRegistry|Confluent::SchemaRegistry> instance.
105              
106             =back
107              
108             =cut
109              
110             sub new {
111 0     0 1   my $this = shift;
112 0   0       my $class = ref($this) || $this;
113 0           my $schema_registry_class = 'Confluent::SchemaRegistry';
114 0           my %params = @_;
115              
116             # Check SchemaRegistry param
117             die "Missing SchemaRegistry param"
118 0 0         unless exists $params{SchemaRegistry};
119             die "SchemaRegistry param must be a $schema_registry_class instance object"
120 0 0         unless ref($params{SchemaRegistry}) eq $schema_registry_class;
121 0           my $schema_registry = delete $params{SchemaRegistry};
122            
123             # Use parent class constructor
124 0           my $self = $class->SUPER::new(%params);
125            
126             # Add ans internal reference to SchemaRegistry
127 0           $self->{__SCHEMA_REGISTRY} = $schema_registry;
128            
129 0           return bless($self, $class);
130             }
131              
132              
133              
134             ##### Class methods
135             our $schemas = [];
136              
137             # Decode from Avro
138             sub _from_avro {
139 0   0 0     my $blob = shift || return undef;
140 0   0       my $sr = shift || return undef;
141 0           my $reader = IO::String->new( $blob );
142 0           seek( $reader, 1, 0 ); # Skip magic byte
143 0           my $buf = "\0\0\0\0";
144 0           read( $reader, $buf, 4 ); # Read schema version stored in avro message header
145 0           my $schema_id = unpack( "N", $buf ); # Retreive schema id from unsigned long (32 byte)
146 0 0         unless ( defined $schemas->[$schema_id] ) {
147 0   0       $schemas->[$schema_id] = $sr->get_schema_by_id( SCHEMA_ID => $schema_id ) || die "Unavailable schema for id $schema_id";
148             }
149 0           return Avro::BinaryDecoder->decode(
150             writer_schema => $schemas->[$schema_id],
151             reader_schema => $schemas->[$schema_id],
152             reader => $reader
153             );
154             }
155              
156             # Decode key and payload of the innput message returning a new Kafka::Message instancefrom Avro format according to an Avro schema
157             sub _decode_message {
158 0     0     my $message = shift;
159 0 0         die "Unknown message format"
160             unless $message->isa('Kafka::Message');
161 0           my $sr = shift;
162 0 0         die "Expected Confluent::SchemaRegistry object"
163             unless $sr->isa('Confluent::SchemaRegistry');
164 0           return Kafka::Message->new(
165             {
166             payload => _from_avro($message->payload, $sr),
167             key => _from_avro($message->key, $sr),
168             Timestamp => $message->Timestamp,
169             valid => $message->valid,
170             error => $message->error,
171             offset => $message->offset,
172             next_offset => $message->next_offset,
173             Attributes => $message->Attributes,
174             HighwaterMarkOffset => $message->HighwaterMarkOffset,
175             MagicByte => $message->MagicByte
176             }
177             );
178             }
179              
180              
181             ##### Private methods
182              
183 0     0     sub _clear_error { $_[0]->_set_error() }
184 0     0     sub _set_error { $_[0]->{__ERROR} = $_[1] }
185 0     0     sub _get_error { $_[0]->{__ERROR} }
186              
187              
188              
189              
190             ##### Public methods
191              
192             =head2 METHODS
193              
194             The following methods are defined for the C<Kafka::Avro::Consumer> class:
195              
196             =cut
197              
198              
199             =head3 C<schema_registry>()
200              
201             Returns the L<Confluent::SchemaRegistry|Confluent::SchemaRegistry> instance supplied to the construcor.
202              
203             =cut
204              
205 0     0 1   sub schema_registry { $_[0]->{__SCHEMA_REGISTRY} }
206              
207              
208             =head3 C<get_error>()
209              
210             Returns a string containing last error message.
211              
212             =cut
213              
214 0     0 1   sub get_error { $_[0]->_get_error() }
215              
216              
217             =head3 C<fetch( %params )>
218              
219             Gets messages froma a Kafka topic.
220              
221             Please, see L<Kafka::Consumer|Kafka::Consumer-E<gt>fetch()> for more details.
222              
223             =cut
224             sub fetch {
225 0     0 1   my $self = shift;
226 0           my $messages = $self->SUPER::fetch(@_);
227 0           my $sr = $self->schema_registry();
228 0           foreach my $message (@$messages) {
229 0           $message = _decode_message($message, $sr);
230             }
231 0           return $messages;
232             }
233              
234              
235              
236             =head1 AUTHOR
237              
238             Alvaro Livraghi, E<lt>alvarol@cpan.orgE<gt>
239              
240             =head1 CONTRIBUTE
241              
242             L<https://github.com/alivraghi/Kafka-Consumer-Avro>
243              
244             =head1 BUGS
245              
246             Please use GitHub project link above to report problems or contact authors.
247              
248             =head1 COPYRIGHT AND LICENSE
249              
250             Copyright 2018 by Alvaro Livraghi
251              
252             This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
253              
254             =cut
255              
256             1;