File Coverage

lib/Kafka/Producer.pm
Criterion Covered Total %
statement 69 82 84.1
branch 30 52 57.6
condition 54 91 59.3
subroutine 14 14 100.0
pod 2 2 100.0
total 169 241 70.1


line stmt bran cond sub pod time code
1             package Kafka::Producer;
2              
3             =head1 NAME
4              
5             Kafka::Producer - Perl interface for Kafka producer client.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.07 .
10              
11             =cut
12              
13              
14              
15 6     6   23540 use 5.010;
  6         18  
16 6     6   26 use strict;
  6         12  
  6         105  
17 6     6   27 use warnings;
  6         9  
  6         256  
18              
19              
20              
21             our $VERSION = '1.07';
22              
23              
24              
25 6     6   30 use Carp;
  6         9  
  6         305  
26 6         315 use Params::Util qw(
27             _ARRAY
28             _INSTANCE
29             _NONNEGINT
30             _NUMBER
31             _STRING
32             _POSINT
33 6     6   32 );
  6         10  
34 6         215 use Scalar::Util qw(
35             blessed
36 6     6   31 );
  6         7  
37 6         263 use Scalar::Util::Numeric qw(
38             isint
39 6     6   28 );
  6         8  
40              
41 6         792 use Kafka qw(
42             %ERROR
43             $COMPRESSION_GZIP
44             $COMPRESSION_NONE
45             $COMPRESSION_SNAPPY
46             $COMPRESSION_LZ4
47             $ERROR_CANNOT_GET_METADATA
48             $ERROR_MISMATCH_ARGUMENT
49             $REQUEST_TIMEOUT
50             $NOT_SEND_ANY_RESPONSE
51             $WAIT_WRITTEN_TO_LOCAL_LOG
52             $BLOCK_UNTIL_IS_COMMITTED
53 6     6   33 );
  6         8  
54 6     6   33 use Kafka::Connection;
  6         10  
  6         385  
55 6     6   28 use Kafka::Exceptions;
  6         13  
  6         228  
56 6         5252 use Kafka::Internals qw(
57             $APIKEY_PRODUCE
58             $MAX_CORRELATIONID
59             $MAX_INT16
60             $MAX_INT32
61             $PRODUCER_ANY_OFFSET
62             _get_CorrelationId
63             format_message
64 6     6   27 );
  6         14  
65              
66              
67              
68             =head1 SYNOPSIS
69              
70             use 5.010;
71             use strict;
72             use warnings;
73              
74             use Scalar::Util qw(
75             blessed
76             );
77             use Try::Tiny;
78              
79             use Kafka::Connection;
80             use Kafka::Producer;
81              
82             my ( $connection, $producer );
83             try {
84              
85             #-- Connection
86             $connection = Kafka::Connection->new( host => 'localhost' );
87              
88             #-- Producer
89             $producer = Kafka::Producer->new( Connection => $connection );
90              
91             # Sending a single message
92             my $response = $producer->send(
93             'mytopic', # topic
94             0, # partition
95             'Single message' # message
96             );
97              
98             # Sending a series of messages
99             $response = $producer->send(
100             'mytopic', # topic
101             0, # partition
102             [ # messages
103             'The first message',
104             'The second message',
105             'The third message',
106             ]
107             );
108              
109             } catch {
110             my $error = $_;
111             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
112             warn 'Error: (', $error->code, ') ', $error->message, "\n";
113             exit;
114             } else {
115             die $error;
116             }
117             };
118              
119             # Closes the producer and cleans up
120             undef $producer;
121             $connection->close;
122             undef $connection;
123              
124             =head1 DESCRIPTION
125              
126             Kafka producer API is implemented by C class.
127              
128             The main features of the C class are:
129              
130             =over 3
131              
132             =item *
133              
134             Provides object-oriented API for producing messages.
135              
136             =item *
137              
138             Provides Kafka PRODUCE requests.
139              
140             =back
141              
142             =cut
143              
144             my %known_compression_codecs = map { $_ => 1 } (
145             $COMPRESSION_NONE,
146             $COMPRESSION_GZIP,
147             $COMPRESSION_SNAPPY,
148             $COMPRESSION_LZ4,
149             );
150              
151             #-- constructor ----------------------------------------------------------------
152              
153             =head2 CONSTRUCTOR
154              
155             =head3 C
156              
157             Creates new producer client object.
158              
159             C takes arguments in key-value pairs. The following arguments are currently recognized:
160              
161             =over 3
162              
163             =item C $connection>
164              
165             C<$connection> is the L object responsible for communication with
166             the Apache Kafka cluster.
167              
168             =item C $client_id>
169              
170             This is a user supplied identifier (string) for the client application.
171              
172             If C is not passed to constructor, its value will be automatically assigned
173             (to string C<'producer'>).
174              
175             =item C $acks>
176              
177             The C<$acks> should be an int16 signed integer.
178              
179             Indicates how many acknowledgements the servers should receive before responding to the request.
180              
181             If it is C<$NOT_SEND_ANY_RESPONSE> the server does not send any response.
182              
183             If it is C<$WAIT_WRITTEN_TO_LOCAL_LOG>, (default)
184             the server will wait until the data is written to the local log before sending a response.
185              
186             If it is C<$BLOCK_UNTIL_IS_COMMITTED>
187             the server will block until the message is committed by all in sync replicas before sending a response.
188              
189             C<$NOT_SEND_ANY_RESPONSE>, C<$WAIT_WRITTEN_TO_LOCAL_LOG>, C<$BLOCK_UNTIL_IS_COMMITTED>
190             can be imported from the L module.
191              
192             =item C $timeout>
193              
194             This provides a maximum time the server can await the receipt
195             of the number of acknowledgements in C.
196              
197             The C<$timeout> in seconds, could be any integer or floating-point type not bigger than int32 positive integer.
198              
199             Optional, default = C<$REQUEST_TIMEOUT>.
200              
201             C<$REQUEST_TIMEOUT> is the default timeout that can be imported from the
202             L module.
203              
204             =back
205              
206             =cut
207             sub new {
208 108     108 1 103907 my ( $class, %p ) = @_;
209              
210 108         328 my $self = bless {
211             Connection => undef,
212             ClientId => undef,
213             RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG,
214             Timeout => $REQUEST_TIMEOUT,
215             }, $class;
216              
217 108   66     652 exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
218              
219 108   100     343 $self->{ClientId} //= 'producer';
220              
221             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'Connection' )
222 108 100       579 unless _INSTANCE( $self->{Connection}, 'Kafka::Connection' );
223             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'ClientId' )
224 88 100 66     476 unless ( $self->{ClientId} eq '' || defined( _STRING( $self->{ClientId} ) ) ) && !utf8::is_utf8( $self->{ClientId} );
      100        
225             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'Timeout (%s)', $self->{Timeout} ) )
226 80 50 66     395 unless defined _NUMBER( $self->{Timeout} ) && int( $self->{Timeout} * 1000 ) >= 1 && int( $self->{Timeout} * 1000 ) <= $MAX_INT32;
      66        
227              
228 70         94 my $required_acks = $self->{RequiredAcks};
229 70 100 66     426 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'RequiredAcks' )
      66        
      100        
230             unless
231             defined( $required_acks )
232             && isint( $required_acks )
233             && (
234             $required_acks == $NOT_SEND_ANY_RESPONSE
235             || $required_acks == $WAIT_WRITTEN_TO_LOCAL_LOG
236             || $required_acks == $BLOCK_UNTIL_IS_COMMITTED
237             )
238             ;
239              
240 60         203 return $self;
241             }
242              
243             #-- public attributes ----------------------------------------------------------
244              
245             =head2 METHODS
246              
247             The following methods are defined for the C class:
248              
249             =cut
250              
251             #-- public methods -------------------------------------------------------------
252              
253             =head3 C
254              
255             Sends a messages on a L object.
256              
257             Returns a non-blank value (a reference to a hash with server response description)
258             if the message is successfully sent.
259              
260             C takes the following arguments:
261              
262             =over 3
263              
264             =item C<$topic>
265              
266             The C<$topic> must be a normal non-false string of non-zero length.
267              
268             =item C<$partition>
269              
270             The C<$partition> must be a non-negative integer.
271              
272             =item C<$messages>
273              
274             The C<$messages> is an arbitrary amount of data (a simple data string or
275             a reference to an array of the data strings).
276              
277             =item C<$keys>
278              
279             The C<$keys> are optional message keys, for partitioning with each message,
280             so the consumer knows the partitioning key.
281             This argument should be either a single string (common key for all messages),
282             or an array of strings with length matching messages array.
283              
284             =item C<$compression_codec>
285              
286             Optional.
287              
288             C<$compression_codec> sets the required type of C<$messages> compression,
289             if the compression is desirable.
290              
291             Supported codecs:
292             C<$COMPRESSION_NONE>,
293             C<$COMPRESSION_GZIP>,
294             C<$COMPRESSION_SNAPPY>,
295             C<$COMPRESSION_LZ4>.
296             The defaults that can be imported from the L module.
297              
298             =item C<$timestamps>
299              
300             Optional.
301              
302             This is the timestamps of the C<$messages>.
303              
304             This argument should be either a single number (common timestamp for all messages),
305             or an array of integers with length matching messages array.
306              
307             Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)).
308              
309             B: timestamps supported since Kafka 0.10.0.
310              
311              
312             Do not use C<$Kafka::SEND_MAX_ATTEMPTS> in Csend> request to prevent duplicates.
313              
314             =back
315              
316             =cut
317             sub send {
318 5070     5070 1 87435 my ( $self, $topic, $partition, $messages, $keys, $compression_codec, $timestamps ) = @_;
319              
320 5070 100 66     28086 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
      100        
321             unless defined( $topic ) && ( $topic eq '' || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
322 5062 100 66     19921 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      66        
323             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
324 5052 100 100     14975 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'messages' )
325             unless defined( _STRING( $messages ) ) || _ARRAY( $messages );
326 5047 100 100     7893 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'keys' )
      66        
327             unless ( !defined( $keys ) || defined( _STRING( $keys ) ) || _ARRAY( $keys ) );
328             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'compression_codec' )
329 5041 100 100     5856 unless ( !defined( $compression_codec ) || $known_compression_codecs{ $compression_codec } );
330 5029 0 33     7099 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'timestamps' )
      33        
331             unless ( !defined( $timestamps ) || defined( _POSINT( $timestamps ) ) || _ARRAY( $timestamps ) );
332              
333 5029 100       7200 $messages = [ $messages ] unless ref( $messages );
334 5029         5844 foreach my $message ( @$messages ) {
335 10042 100 66     36826 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'message = %s', $message ) )
      100        
336             unless defined( $message ) && ( $message eq '' || ( defined( _STRING( $message ) ) && !utf8::is_utf8( $message ) ) );
337             }
338              
339 5020         4735 my $common_key;
340              
341 5020 50       8016 if( _ARRAY( $keys ) ) {
    100          
342             # ensure that keys array maytches messages array
343 0 0       0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'keys' )
344             unless scalar( @$keys ) == scalar( @$messages );
345              
346 0         0 foreach my $key ( @$keys ) {
347 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'key = %s', $key ) )
      0        
      0        
348             unless !defined( $key ) || $key eq '' || ( defined( _STRING( $key ) ) && !utf8::is_utf8( $key ) );
349             }
350             }
351             elsif( defined $keys ) {
352 1 50 33     12 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'key = %s', $keys ) )
      33        
353             unless $keys eq '' || ( defined( _STRING( $keys ) ) && !utf8::is_utf8( $keys ) );
354 0         0 $common_key = $keys;
355             }
356             else {
357 5019         5061 $common_key = '';
358             }
359              
360 5019         4650 my ($common_ts, $use_ts);
361              
362 5019 50       8511 if( _ARRAY( $timestamps ) ) {
    50          
363             # ensure that timestamps array maytches messages array
364 0 0       0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'timestamps' )
365             unless scalar( @$timestamps ) == scalar( @$messages );
366              
367 0         0 foreach my $ts ( @$timestamps ) {
368 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'timestamp = %s', $ts ) )
369             unless !defined( $ts ) || defined( _POSINT( $ts ) );
370             }
371 0         0 $use_ts = 1;
372             }
373             elsif( defined $timestamps ) {
374 0 0       0 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'timestamp = %s', $timestamps ) )
375             unless defined( _POSINT( $timestamps ));
376 0         0 $common_ts = $timestamps;
377 0         0 $use_ts = 1;
378             }
379              
380 5019         5502 my $MessageSet = [];
381             my $request = {
382             ApiKey => $APIKEY_PRODUCE,
383             CorrelationId => _get_CorrelationId(),
384             ClientId => $self->{ClientId},
385             RequiredAcks => $self->{RequiredAcks},
386 5019         10201 Timeout => int( $self->{Timeout} * 1000 ),
387             topics => [
388             {
389             TopicName => $topic,
390             partitions => [
391             {
392             Partition => $partition,
393             MessageSet => $MessageSet,
394             },
395             ],
396             },
397             ],
398             };
399 5019 50 66     15289 if ( ( defined $compression_codec and $compression_codec == $COMPRESSION_LZ4 ) or defined $timestamps ) {
      33        
400 0         0 $request->{ApiVersion} = 2;
401             }
402              
403 5019         4517 my $key_index = 0;
404 5019         5653 foreach my $message ( @$messages ) {
405 10032 50 0     23744 push @$MessageSet, {
    0          
    50          
406             Offset => $PRODUCER_ANY_OFFSET,
407             Key => defined $common_key ? $common_key : ( $keys->[ $key_index ] // '' ),
408             Value => $message,
409             $use_ts ? ( Timestamp => defined $common_ts ? $common_ts : $timestamps->[$key_index] ) : (),
410             };
411 10032         11599 ++$key_index;
412             }
413              
414 5019         11324 my $result = $self->{Connection}->receive_response_to_request( $request, $compression_codec, $self->{Timeout} );
415 5017         22506 return $result;
416             }
417              
418             #-- private attributes ---------------------------------------------------------
419              
420             #-- private methods ------------------------------------------------------------
421              
422             # Handler for errors
423             sub _error {
424 99     99   108 my $self = shift;
425              
426 99         233 Kafka::Exception::Producer->throw( throw_args( @_ ) );
427              
428 0           return;
429             }
430              
431              
432              
433             1;
434              
435             __END__