File Coverage

lib/Kafka/Producer.pm
Criterion Covered Total %
statement 65 70 92.8
branch 26 34 76.4
condition 49 76 64.4
subroutine 14 14 100.0
pod 2 2 100.0
total 156 196 79.5


line stmt bran cond sub pod time code
1             package Kafka::Producer;
2              
3             =head1 NAME
4              
5             Kafka::Producer - Perl interface for Kafka producer client.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.06 .
10              
11             =cut
12              
13              
14              
15 6     6   26954 use 5.010;
  6         25  
16 6     6   33 use strict;
  6         11  
  6         142  
17 6     6   34 use warnings;
  6         13  
  6         274  
18              
19              
20              
21             our $VERSION = '1.06';
22              
23              
24              
25 6     6   53 use Carp;
  6         12  
  6         343  
26 6         345 use Params::Util qw(
27             _ARRAY
28             _INSTANCE
29             _NONNEGINT
30             _NUMBER
31             _STRING
32 6     6   42 );
  6         13  
33 6         236 use Scalar::Util qw(
34             blessed
35 6     6   36 );
  6         10  
36 6         337 use Scalar::Util::Numeric qw(
37             isint
38 6     6   65 );
  6         13  
39              
40 6         949 use Kafka qw(
41             %ERROR
42             $COMPRESSION_GZIP
43             $COMPRESSION_NONE
44             $COMPRESSION_SNAPPY
45             $ERROR_CANNOT_GET_METADATA
46             $ERROR_MISMATCH_ARGUMENT
47             $REQUEST_TIMEOUT
48             $NOT_SEND_ANY_RESPONSE
49             $WAIT_WRITTEN_TO_LOCAL_LOG
50             $BLOCK_UNTIL_IS_COMMITTED
51 6     6   34 );
  6         10  
52 6     6   45 use Kafka::Connection;
  6         14  
  6         540  
53 6     6   46 use Kafka::Exceptions;
  6         13  
  6         320  
54 6         5107 use Kafka::Internals qw(
55             $APIKEY_PRODUCE
56             $MAX_CORRELATIONID
57             $MAX_INT16
58             $MAX_INT32
59             $PRODUCER_ANY_OFFSET
60             _get_CorrelationId
61             format_message
62 6     6   41 );
  6         9  
63              
64              
65              
66             =head1 SYNOPSIS
67              
68             use 5.010;
69             use strict;
70             use warnings;
71              
72             use Scalar::Util qw(
73             blessed
74             );
75             use Try::Tiny;
76              
77             use Kafka::Connection;
78             use Kafka::Producer;
79              
80             my ( $connection, $producer );
81             try {
82              
83             #-- Connection
84             $connection = Kafka::Connection->new( host => 'localhost' );
85              
86             #-- Producer
87             $producer = Kafka::Producer->new( Connection => $connection );
88              
89             # Sending a single message
90             my $response = $producer->send(
91             'mytopic', # topic
92             0, # partition
93             'Single message' # message
94             );
95              
96             # Sending a series of messages
97             $response = $producer->send(
98             'mytopic', # topic
99             0, # partition
100             [ # messages
101             'The first message',
102             'The second message',
103             'The third message',
104             ]
105             );
106              
107             } catch {
108             my $error = $_;
109             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
110             warn 'Error: (', $error->code, ') ', $error->message, "\n";
111             exit;
112             } else {
113             die $error;
114             }
115             };
116              
117             # Closes the producer and cleans up
118             undef $producer;
119             $connection->close;
120             undef $connection;
121              
122             =head1 DESCRIPTION
123              
124             Kafka producer API is implemented by C class.
125              
126             The main features of the C class are:
127              
128             =over 3
129              
130             =item *
131              
132             Provides object-oriented API for producing messages.
133              
134             =item *
135              
136             Provides Kafka PRODUCE requests.
137              
138             =back
139              
140             =cut
141              
142             my %known_compression_codecs = map { $_ => 1 } (
143             $COMPRESSION_NONE,
144             $COMPRESSION_GZIP,
145             $COMPRESSION_SNAPPY,
146             );
147              
148             #-- constructor ----------------------------------------------------------------
149              
150             =head2 CONSTRUCTOR
151              
152             =head3 C
153              
154             Creates new producer client object.
155              
156             C takes arguments in key-value pairs. The following arguments are currently recognized:
157              
158             =over 3
159              
160             =item C $connection>
161              
162             C<$connection> is the L object responsible for communication with
163             the Apache Kafka cluster.
164              
165             =item C $client_id>
166              
167             This is a user supplied identifier (string) for the client application.
168              
169             If C is not passed to constructor, its value will be automatically assigned
170             (to string C<'producer'>).
171              
172             =item C $acks>
173              
174             The C<$acks> should be an int16 signed integer.
175              
176             Indicates how many acknowledgements the servers should receive before responding to the request.
177              
178             If it is C<$NOT_SEND_ANY_RESPONSE> the server does not send any response.
179              
180             If it is C<$WAIT_WRITTEN_TO_LOCAL_LOG>, (default)
181             the server will wait until the data is written to the local log before sending a response.
182              
183             If it is C<$BLOCK_UNTIL_IS_COMMITTED>
184             the server will block until the message is committed by all in sync replicas before sending a response.
185              
186             C<$NOT_SEND_ANY_RESPONSE>, C<$WAIT_WRITTEN_TO_LOCAL_LOG>, C<$BLOCK_UNTIL_IS_COMMITTED>
187             can be imported from the L module.
188              
189             =item C $timeout>
190              
191             This provides a maximum time the server can await the receipt
192             of the number of acknowledgements in C.
193              
194             The C<$timeout> in seconds, could be any integer or floating-point type not bigger than int32 positive integer.
195              
196             Optional, default = C<$REQUEST_TIMEOUT>.
197              
198             C<$REQUEST_TIMEOUT> is the default timeout that can be imported from the
199             L module.
200              
201             =back
202              
203             =cut
204             sub new {
205 108     108 1 154309 my ( $class, %p ) = @_;
206              
207 108         442 my $self = bless {
208             Connection => undef,
209             ClientId => undef,
210             RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG,
211             Timeout => $REQUEST_TIMEOUT,
212             }, $class;
213              
214 108   66     847 exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
215              
216 108   100     467 $self->{ClientId} //= 'producer';
217              
218             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'Connection' )
219 108 100       806 unless _INSTANCE( $self->{Connection}, 'Kafka::Connection' );
220             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'ClientId' )
221 88 100 66     683 unless ( $self->{ClientId} eq '' || defined( _STRING( $self->{ClientId} ) ) ) && !utf8::is_utf8( $self->{ClientId} );
      100        
222             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'Timeout (%s)', $self->{Timeout} ) )
223 80 50 66     545 unless defined _NUMBER( $self->{Timeout} ) && int( $self->{Timeout} * 1000 ) >= 1 && int( $self->{Timeout} * 1000 ) <= $MAX_INT32;
      66        
224              
225 70         130 my $required_acks = $self->{RequiredAcks};
226 70 100 66     610 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'RequiredAcks' )
      66        
      100        
227             unless
228             defined( $required_acks )
229             && isint( $required_acks )
230             && (
231             $required_acks == $NOT_SEND_ANY_RESPONSE
232             || $required_acks == $WAIT_WRITTEN_TO_LOCAL_LOG
233             || $required_acks == $BLOCK_UNTIL_IS_COMMITTED
234             )
235             ;
236              
237 60         327 return $self;
238             }
239              
240             #-- public attributes ----------------------------------------------------------
241              
242             =head2 METHODS
243              
244             The following methods are defined for the C class:
245              
246             =cut
247              
248             #-- public methods -------------------------------------------------------------
249              
250             =head3 C
251              
252             Sends a messages on a L object.
253              
254             Returns a non-blank value (a reference to a hash with server response description)
255             if the message is successfully sent.
256              
257             C takes the following arguments:
258              
259             =over 3
260              
261             =item C<$topic>
262              
263             The C<$topic> must be a normal non-false string of non-zero length.
264              
265             =item C<$partition>
266              
267             The C<$partition> must be a non-negative integer.
268              
269             =item C<$messages>
270              
271             The C<$messages> is an arbitrary amount of data (a simple data string or
272             a reference to an array of the data strings).
273              
274             =item C<$keys>
275              
276             The C<$keys> are optional message keys, for partitioning with each message,
277             so the consumer knows the partitioning key.
278             This argument should be either a single string (common key for all messages),
279             or an array of strings with length matching messages array.
280              
281             =item C<$compression_codec>
282              
283             Optional.
284              
285             C<$compression_codec> sets the required type of C<$messages> compression,
286             if the compression is desirable.
287              
288             Supported codecs:
289             C<$COMPRESSION_NONE>,
290             C<$COMPRESSION_GZIP>,
291             C<$COMPRESSION_SNAPPY>.
292             The defaults that can be imported from the L module.
293              
294             Do not use C<$Kafka::SEND_MAX_ATTEMPTS> in Csend> request to prevent duplicates.
295              
296             =back
297              
298             =cut
299             sub send {
300 5070     5070 1 111065 my ( $self, $topic, $partition, $messages, $keys, $compression_codec ) = @_;
301              
302 5070 100 66     32870 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
      100        
303             unless defined( $topic ) && ( $topic eq '' || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
304 5062 100 66     24690 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      66        
305             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
306 5052 100 100     20360 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'messages' )
307             unless defined( _STRING( $messages ) ) || _ARRAY( $messages );
308 5047 100 100     9786 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'keys' )
      66        
309             unless ( !defined $keys || defined( _STRING( $keys ) ) || _ARRAY( $keys ) );
310             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'compression_codec' )
311 5041 100 100     8131 unless ( !defined( $compression_codec ) || $known_compression_codecs{ $compression_codec } );
312              
313 5029 100       10559 $messages = [ $messages ] unless ref( $messages );
314 5029         7295 foreach my $message ( @$messages ) {
315 10042 100 66     49789 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'message = %s', $message ) )
      100        
316             unless defined( $message ) && ( $message eq '' || ( defined( _STRING( $message ) ) && !utf8::is_utf8( $message ) ) );
317             }
318              
319 5020         6430 my $common_key;
320              
321 5020 50       11025 if( _ARRAY( $keys ) ) {
    100          
322             # ensure that keys array maytches messages array
323 0 0       0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'keys' )
324             unless scalar( @$keys ) == scalar( @$messages );
325              
326 0         0 foreach my $key ( @$keys ) {
327 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'key = %s', $key ) )
      0        
      0        
328             unless !defined( $key ) || $key eq '' || ( defined( _STRING( $key ) ) && !utf8::is_utf8( $key ) );
329             }
330             }
331             elsif( defined $keys ) {
332 1 50 33     12 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'key = %s', $keys ) )
      33        
333             unless $keys eq '' || ( defined( _STRING( $keys ) ) && !utf8::is_utf8( $keys ) );
334 0         0 $common_key = $keys;
335             }
336             else {
337 5019         6635 $common_key = '';
338             }
339              
340 5019         6539 my $MessageSet = [];
341             my $request = {
342             ApiKey => $APIKEY_PRODUCE,
343             CorrelationId => _get_CorrelationId(),
344             ClientId => $self->{ClientId},
345             RequiredAcks => $self->{RequiredAcks},
346 5019         11426 Timeout => int( $self->{Timeout} * 1000 ),
347             topics => [
348             {
349             TopicName => $topic,
350             partitions => [
351             {
352             Partition => $partition,
353             MessageSet => $MessageSet,
354             },
355             ],
356             },
357             ],
358             };
359              
360 5019         8547 my $key_index = 0;
361 5019         7226 foreach my $message ( @$messages ) {
362 10032 50 0     24782 push @$MessageSet, {
363             Offset => $PRODUCER_ANY_OFFSET,
364             Key => defined $common_key ? $common_key : ( $keys->[ $key_index ] // '' ),
365             Value => $message,
366             };
367 10032         14211 ++$key_index;
368             }
369              
370 5019         13905 my $result = $self->{Connection}->receive_response_to_request( $request, $compression_codec, $self->{Timeout} );
371 5017         35275 return $result;
372             }
373              
374             #-- private attributes ---------------------------------------------------------
375              
376             #-- private methods ------------------------------------------------------------
377              
378             # Handler for errors
379             sub _error {
380 99     99   152 my $self = shift;
381              
382 99         288 Kafka::Exception::Producer->throw( throw_args( @_ ) );
383              
384 0           return;
385             }
386              
387              
388              
389             1;
390              
391             __END__