File Coverage

lib/Kafka/Consumer.pm
Criterion Covered Total %
statement 92 125 73.6
branch 37 66 56.0
condition 81 185 43.7
subroutine 16 22 72.7
pod 9 9 100.0
total 235 407 57.7


line stmt bran cond sub pod time code
1             package Kafka::Consumer;
2              
3             =head1 NAME
4              
5             Kafka::Consumer - Perl interface for Kafka consumer client.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.07 .
10              
11             =cut
12              
13 5     5   2605 use 5.010;
  5         17  
14 5     5   24 use strict;
  5         6  
  5         95  
15 5     5   23 use warnings;
  5         7  
  5         207  
16              
17             our $VERSION = '1.07';
18              
19 5     5   21 use Carp;
  5         7  
  5         258  
20 5         251 use Params::Util qw(
21             _INSTANCE
22             _NONNEGINT
23             _NUMBER
24             _POSINT
25             _STRING
26 5     5   24 );
  5         14  
27 5         235 use Scalar::Util::Numeric qw(
28             isint
29 5     5   31 );
  5         8  
30              
31 5         674 use Kafka qw(
32             $BITS64
33             $DEFAULT_MAX_BYTES
34             $DEFAULT_MAX_NUMBER_OF_OFFSETS
35             $DEFAULT_MAX_WAIT_TIME
36             %ERROR
37             $ERROR_CANNOT_GET_METADATA
38             $ERROR_METADATA_ATTRIBUTES
39             $ERROR_MISMATCH_ARGUMENT
40             $ERROR_PARTITION_DOES_NOT_MATCH
41             $ERROR_TOPIC_DOES_NOT_MATCH
42             $MESSAGE_SIZE_OVERHEAD
43             $MIN_BYTES_RESPOND_IMMEDIATELY
44             $RECEIVE_LATEST_OFFSETS
45             $RECEIVE_EARLIEST_OFFSET
46 5     5   23 );
  5         6  
47 5     5   31 use Kafka::Exceptions;
  5         9  
  5         210  
48 5         549 use Kafka::Internals qw(
49             $APIKEY_FETCH
50             $APIKEY_OFFSET
51             $APIKEY_OFFSETCOMMIT
52             $APIKEY_OFFSETFETCH
53             $MAX_INT32
54             _get_CorrelationId
55             _isbig
56             format_message
57 5     5   25 );
  5         55  
58 5     5   28 use Kafka::Connection;
  5         5  
  5         344  
59 5     5   986 use Kafka::Message;
  5         10  
  5         7680  
60              
61             if ( !$BITS64 ) { eval 'use Kafka::Int64; 1;' or die "Cannot load Kafka::Int64 : $@"; } ## no critic
62              
63             =head1 SYNOPSIS
64              
65             use 5.010;
66             use strict;
67             use warnings;
68              
69             use Scalar::Util qw(
70             blessed
71             );
72             use Try::Tiny;
73              
74             use Kafka qw(
75             $DEFAULT_MAX_BYTES
76             $DEFAULT_MAX_NUMBER_OF_OFFSETS
77             $RECEIVE_EARLIEST_OFFSET
78             );
79             use Kafka::Connection;
80             use Kafka::Consumer;
81              
82             my ( $connection, $consumer );
83             try {
84              
85             #-- Connection
86             $connection = Kafka::Connection->new( host => 'localhost' );
87              
88             #-- Consumer
89             $consumer = Kafka::Consumer->new( Connection => $connection );
90              
91             # Get a valid offset before the given time
92             my $offsets = $consumer->offset_before_time(
93             'mytopic', # topic
94             0, # partition
95             (time()-3600) * 1000, # time
96             );
97              
98             if ( @$offsets ) {
99             say "Received offset: $_" foreach @$offsets;
100             } else {
101             warn "Error: Offsets are not received\n";
102             }
103              
104             # Consuming messages
105             my $messages = $consumer->fetch(
106             'mytopic', # topic
107             0, # partition
108             0, # offset
109             $DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive
110             );
111              
112             if ( $messages ) {
113             foreach my $message ( @$messages ) {
114             if ( $message->valid ) {
115             say 'payload : ', $message->payload;
116             say 'key : ', $message->key;
117             say 'offset : ', $message->offset;
118             say 'next_offset: ', $message->next_offset;
119             } else {
120             say 'error : ', $message->error;
121             }
122             }
123             }
124              
125             } catch {
126             my $error = $_;
127             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
128             warn 'Error: (', $error->code, ') ', $error->message, "\n";
129             exit;
130             } else {
131             die $error;
132             }
133             };
134              
135             # Closes the consumer and cleans up
136             undef $consumer;
137             $connection->close;
138             undef $connection;
139              
140             =head1 DESCRIPTION
141              
142             Kafka consumer API is implemented by C class.
143              
144             The main features of the C class are:
145              
146             =over 3
147              
148             =item *
149              
150             Provides an object-oriented API for consuming messages.
151              
152             =item *
153              
154             Provides Kafka FETCH and OFFSETS requests.
155              
156             =item *
157              
158             Supports parsing the Apache Kafka 0.9+ Wire Format protocol.
159              
160             =item *
161              
162             Works with 64-bit elements of the Kafka Wire Format protocol
163             on 32 bit systems.
164              
165             =back
166              
167             The Kafka consumer response returns ARRAY references for C and
168             C methods.
169              
170             Array returned by C contains offset integers.
171              
172             Array returned by C contains objects of L class.
173              
174             =cut
175              
176             #-- constructor ----------------------------------------------------------------
177              
178             =head2 CONSTRUCTOR
179              
180             =head3 C
181              
182             Creates a new consumer client object. Returns the created C object.
183              
184             C takes arguments in key-value pairs. The following arguments are recognized:
185              
186             =over 3
187              
188             =item C $connection>
189              
190             C<$connection> is the L object responsible for communication with
191             the Apache Kafka cluster.
192              
193             =item C $client_id>
194              
195             This is a user supplied identifier (string) for the client application.
196              
197             C will be auto-assigned if not passed in when creating L object.
198              
199             =item C $max_time>
200              
201             The maximum amount of time (seconds, may be fractional) to wait when no sufficient data is available at the time the
202             request was issued.
203              
204             Optional, default is C<$DEFAULT_MAX_WAIT_TIME>.
205              
206             C<$DEFAULT_MAX_WAIT_TIME> is the default time that can be imported from the
207             L module.
208              
209             The C<$max_time> must be a positive number.
210              
211             =item C $min_bytes>
212              
213             The minimum number of bytes of messages that must be available to give a response.
214             If the client sets this to C<$MIN_BYTES_RESPOND_IMMEDIATELY> the server will always respond
215             immediately. If it is set to C<$MIN_BYTES_RESPOND_HAS_DATA>, the server will respond as soon
216             as at least one partition has at least 1 byte of data or the specified timeout occurs.
217             Setting higher values in combination with the bigger timeouts allows reading larger chunks of data.
218              
219             Optional, int32 signed integer, default is C<$MIN_BYTES_RESPOND_IMMEDIATELY>.
220              
221             C<$MIN_BYTES_RESPOND_IMMEDIATELY>, C<$MIN_BYTES_RESPOND_HAS_DATA> are the defaults that
222             can be imported from the L module.
223              
224             The C<$min_bytes> must be a non-negative int32 signed integer.
225              
226             =item C $max_bytes>
227              
228             The maximum bytes to include in the message set for this partition.
229              
230             Optional, int32 signed integer, default = C<$DEFAULT_MAX_BYTES> (1_000_000).
231              
232             The C<$max_bytes> must be more than C<$MESSAGE_SIZE_OVERHEAD>
233             (size of protocol overhead - data added by Kafka wire protocol to each message).
234              
235             C<$DEFAULT_MAX_BYTES>, C<$MESSAGE_SIZE_OVERHEAD>
236             are the defaults that can be imported from the L module.
237              
238             =item C $max_number>
239              
240             Limit the number of offsets returned by Kafka.
241              
242             That is a non-negative integer.
243              
244             Optional, int32 signed integer, default = C<$DEFAULT_MAX_NUMBER_OF_OFFSETS> (100).
245              
246             C<$DEFAULT_MAX_NUMBER_OF_OFFSETS>
247             is the default that can be imported from the L module.
248              
249             =back
250              
251             =cut
252             sub new {
253 179     179 1 179023 my ( $class, %p ) = @_;
254              
255 179         653 my $self = bless {
256             Connection => undef,
257             ClientId => undef,
258             MaxWaitTime => $DEFAULT_MAX_WAIT_TIME,
259             MinBytes => $MIN_BYTES_RESPOND_IMMEDIATELY,
260             MaxBytes => $DEFAULT_MAX_BYTES,
261             MaxNumberOfOffsets => $DEFAULT_MAX_NUMBER_OF_OFFSETS,
262             ApiVersion => undef, # undef - allows consumer to choose newest supported
263             }, $class;
264              
265 179   66     1370 exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
266              
267 179   100     703 $self->{ClientId} //= 'consumer';
268              
269             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'Connection' )
270 179 100       1040 unless _INSTANCE( $self->{Connection}, 'Kafka::Connection' );
271             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'ClientId' )
272 159 100 66     784 unless ( $self->{ClientId} eq q{} || defined( _STRING( $self->{ClientId} ) ) ) && !utf8::is_utf8( $self->{ClientId} );
      100        
273             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MaxWaitTime (%s)', $self->{MaxWaitTime} ) )
274 151 100 100     914 unless defined( $self->{MaxWaitTime} ) && defined _NUMBER( $self->{MaxWaitTime} ) && int( $self->{MaxWaitTime} * 1000 ) >= 1 && int( $self->{MaxWaitTime} * 1000 ) <= $MAX_INT32;
      100        
      66        
275             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MinBytes (%s)', $self->{MinBytes} ) )
276 139 50 66     339 unless ( _isbig( $self->{MinBytes} ) ? ( $self->{MinBytes} >= 0 ) : defined( _NONNEGINT( $self->{MinBytes} ) ) ) && $self->{MinBytes} <= $MAX_INT32;
    100          
277             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MaxBytes (%s)', $self->{MaxBytes} ) )
278 126 50 100     1116 unless ( _isbig( $self->{MaxBytes} ) ? ( $self->{MaxBytes} > 0 ) : _POSINT( $self->{MaxBytes} ) ) && $self->{MaxBytes} >= $MESSAGE_SIZE_OVERHEAD && $self->{MaxBytes} <= $MAX_INT32;
    100 66        
279             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MaxNumberOfOffsets (%s)', $self->{MaxNumberOfOffsets} ) )
280 110 100 66     2335 unless defined( _POSINT( $self->{MaxNumberOfOffsets} ) ) && $self->{MaxNumberOfOffsets} <= $MAX_INT32;
281              
282 95         864 return $self;
283             }
284              
285             #-- public attributes ----------------------------------------------------------
286              
287             =head2 METHODS
288              
289             The following methods are defined for the C class:
290              
291             =cut
292              
293             #-- public methods -------------------------------------------------------------
294              
295             =head3 C
296              
297             Get a list of messages to consume one by one up to C<$max_size> bytes.
298              
299             Returns the reference to array of the L objects.
300              
301             C takes the following arguments:
302              
303             =over 3
304              
305             =item C<$topic>
306              
307             The C<$topic> must be a normal non-false string of non-zero length.
308              
309             =item C<$partition>
310              
311             The C<$partition> must be a non-negative integer.
312              
313             =item C<$start_offset>
314              
315             Offset in topic and partition to start from (64-bit integer).
316              
317             The argument must be a non-negative integer. The argument may be a
318             L integer on 32-bit system.
319              
320             =item C<$max_size>
321              
322             C<$max_size> is the maximum size of the messages set to return. The argument
323             must be a positive int32 signed integer.
324              
325             The maximum size of a request limited by C that
326             can be imported from L module.
327              
328             =back
329              
330             =cut
331             sub fetch {
332 5055     5055 1 141738 my ( $self, $topic, $partition, $start_offset, $max_size, $_return_all, $api_version ) = @_;
333             # Special argument: $_return_all - return redundant messages sent out of a compressed package posts
334              
335 5055 100 66     30339 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
      100        
336             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
337 5047 100 66     23724 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      66        
338             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
339 5037 100 66     13040 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'offset' )
      100        
340             unless defined( $start_offset ) && ( ( _isbig( $start_offset ) && $start_offset >= 0 ) || defined( _NONNEGINT( $start_offset ) ) );
341 5024 100 66     41801 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'max_size (%s)', $max_size ) )
      100        
      66        
      66        
342             unless ( !defined( $max_size ) || ( ( _isbig( $max_size ) || _POSINT( $max_size ) ) && $max_size >= $MESSAGE_SIZE_OVERHEAD && $max_size <= $MAX_INT32 ) );
343              
344             my $request = {
345             ApiKey => $APIKEY_FETCH,
346             ApiVersion => $api_version // $self->{ApiVersion},
347             CorrelationId => _get_CorrelationId(),
348             ClientId => $self->{ClientId},
349             MaxWaitTime => int( $self->{MaxWaitTime} * 1000 ),
350             MinBytes => $self->{MinBytes},
351             MaxBytes => $max_size // $self->{MaxBytes},
352             topics => [
353             {
354             TopicName => $topic,
355             partitions => [
356             {
357             Partition => $partition,
358             FetchOffset => $start_offset,
359             MaxBytes => $max_size // $self->{MaxBytes},
360             },
361 5011   33     52871 ],
      66        
      66        
362             },
363             ],
364             };
365              
366 5011         16308 my $response = $self->{Connection}->receive_response_to_request( $request, undef, $self->{MaxWaitTime} );
367              
368 5009         6830 my $messages = [];
369 5009         4917 foreach my $received_topic ( @{ $response->{topics} } ) {
  5009         7969  
370             $received_topic->{TopicName} eq $topic
371 5009 50       8131 or $self->_error( $ERROR_TOPIC_DOES_NOT_MATCH, format_message( "'%s' ne '%s'", $topic, $received_topic->{TopicName} ) );
372 5009         5003 foreach my $received_partition ( @{ $received_topic->{partitions} } ) {
  5009         6314  
373             $received_partition->{Partition} == $partition
374 5009 50       6383 or $self->_error( $ERROR_PARTITION_DOES_NOT_MATCH, format_message( '%s != %s', $partition, $received_partition->{Partition} ) );
375 5009         4577 my $HighwaterMarkOffset = $received_partition->{HighwaterMarkOffset};
376 5009         4316 foreach my $Message ( @{ $received_partition->{MessageSet} } ) {
  5009         5802  
377 15018         14928 my $offset = $Message->{Offset};
378 15018         12713 my $next_offset;
379 15018 50       17113 if ( $BITS64 ) {
380 15018         14032 $next_offset += $offset + 1;
381             } else {
382 0         0 $offset = Kafka::Int64::intsum( $offset, 0 );
383 0         0 $next_offset = Kafka::Int64::intsum( $offset, 1 );
384             }
385              
386             # skip previous messages of a compressed package posts
387 15018 50 33     20502 next if $offset < $start_offset && !$_return_all;
388              
389 15018         13262 my $message_error = q{};
390             # According to Apache Kafka documentation:
391             # This byte holds metadata attributes about the message. The
392             # lowest 3 bits contain the compression codec used for the
393             # message. The fourth lowest bit represents the timestamp type.
394             # 0 stands for CreateTime and 1 stands for LogAppendTime. The
395             # producer should always set this bit to 0. (since 0.10.0).
396             # All other bits should be set to 0.
397 15018         14005 my $attributes = $Message->{Attributes};
398             # check that attributes is valid
399             $attributes & 0b11110000
400 15018 50       18950 and $message_error = $ERROR{ $ERROR_METADATA_ATTRIBUTES };
401              
402 15018 50       18049 if (my $compression_codec = $attributes & 0b00000111) {
403 0 0 0     0 unless ( $compression_codec == 1 # GZIP
404             || $compression_codec == 2 # Snappy
405             ) {
406 0         0 $message_error = $ERROR{ $ERROR_METADATA_ATTRIBUTES };
407             }
408             }
409              
410             push( @$messages, Kafka::Message->new( {
411             Attributes => $Message->{Attributes},
412             Timestamp => $Message->{Timestamp},
413             MagicByte => $Message->{MagicByte},
414             key => $Message->{Key},
415             payload => $Message->{Value},
416 15018         70188 offset => $offset,
417             next_offset => $next_offset,
418             error => $message_error,
419             valid => !$message_error,
420             HighwaterMarkOffset => $HighwaterMarkOffset,
421             } )
422             );
423             }
424             }
425             }
426              
427 5009         31240 return $messages;
428             }
429              
430             =head3 C
431              
432             Returns an offset, given a topic, partition and time.
433              
434             The returned offset is the earliest offset whose timestamp is greater than or
435             equal to the given timestamp. The return value is a HashRef, containing
436             C and C keys.
437              
438             B: this method requires Kafka 0.10.0, and messages with timestamps.
439             Check the configuration of the brokers or topic, specifically
440             C, and set it either to C to have Kafka
441             automatically set messages timestamps based on the broker clock, or
442             C, in which case the client populating your topic has to set the
443             timestamps when producing messages.
444              
445             C takes the following arguments:
446              
447             =over 3
448              
449             =item C<$topic>
450              
451             The C<$topics> must be a normal non-false strings of non-zero length.
452              
453             =item C<$partition>
454              
455             The C<$partitions> must be a non-negative integers.
456              
457             =item C<$time>
458              
459             Get offsets before the given time (in milliseconds since UNIX Epoch).
460              
461             The argument must be a positive number.
462              
463             The argument may be a L integer on 32 bit
464             system.
465              
466             =back
467              
468             =cut
469              
470             sub offset_at_time {
471 0     0 1 0 my ( $self, $topic, $partition, $time ) = @_;
472              
473             # we don't accept special values for $time, we want a real timestamp
474 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'time' )
      0        
      0        
475             unless defined( $time ) && ( _isbig( $time ) || isint( $time ) ) && $time >= 0;
476              
477             # no max_number, api version = 1
478 0         0 return $self->_query_offsets($topic, $partition, $time, undef, 1)->[0];
479             }
480              
481             =head3 C
482              
483             Returns an offset, given a topic, partition and time.
484              
485             The returned offset is an offset whose timestamp is guaranteed to be earlier
486             than the given timestamp. The return value is a Number
487              
488             This method works with all version of Kafka, and doesn't require messages with
489             timestamps.
490              
491             C takes the following arguments:
492              
493             =over 3
494              
495             =item C<$topic>
496              
497             The C<$topics> must be a normal non-false strings of non-zero length.
498              
499             =item C<$partition>
500              
501             The C<$partitions> must be a non-negative integers.
502              
503             =item C<$time>
504              
505             Get offsets before the given time (in milliseconds since UNIX Epoch).
506              
507             The argument must be a positive number.
508              
509             The argument may be a L integer on 32 bit
510             system.
511              
512             =back
513              
514             =cut
515              
516             sub offset_before_time {
517 0     0 1 0 my ( $self, $topic, $partition, $time ) = @_;
518              
519             # we don't accept special values for $time, we want a real timestamp
520 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'time' )
      0        
      0        
521             unless defined( $time ) && ( _isbig( $time ) || isint( $time ) ) && $time >= 0;
522             # $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'max_number (%s)', $max_number ) )
523             # unless !defined( $max_number ) || ( _POSINT( $max_number ) && $max_number <= $MAX_INT32 );
524              
525             # max_number = 1, api version = 0
526 0         0 return $self->_query_offsets($topic, $partition, $time, 1, 0)->[0];
527             }
528              
529             =head3 C
530              
531             Returns the earliest offset for a given topic and partition
532              
533             C takes the following arguments:
534              
535             =over 3
536              
537             =item C<$topic>
538              
539             The C<$topics> must be a normal non-false strings of non-zero length.
540              
541             =item C<$partition>
542              
543             The C<$partitions> must be a non-negative integers.
544              
545             =back
546              
547             =cut
548              
549             sub offset_earliest {
550 0     0 1 0 my ( $self, $topic, $partition ) = @_;
551              
552             # max_number = 1, api version = 0
553 0         0 return $self->_query_offsets($topic, $partition, $RECEIVE_EARLIEST_OFFSET, 1, 0)->[0];
554             }
555              
556             =head3 C
557              
558             Returns the latest offset for a given topic and partition
559              
560             C takes the following arguments:
561              
562             =over 3
563              
564             =item C<$topic>
565              
566             The C<$topics> must be a normal non-false strings of non-zero length.
567              
568             =item C<$partition>
569              
570             The C<$partitions> must be a non-negative integers.
571              
572             =back
573              
574             =cut
575              
576             sub offset_latest {
577 0     0 1 0 my ( $self, $topic, $partition ) = @_;
578              
579             # max_number = 1, api version = 0
580 0         0 return $self->_query_offsets($topic, $partition, $RECEIVE_LATEST_OFFSETS, 1, 0)->[0];
581             }
582              
583             =head3 C
584              
585             B: This method is DEPRECATED, please use one of C, C, C, C. It is kept for backward compatibility.
586              
587             Returns an ArrayRef of offsets
588              
589             C takes the following arguments:
590              
591             =over 3
592              
593             =item C<$topic>
594              
595             The C<$topics> must be a normal non-false strings of non-zero length.
596              
597             =item C<$partition>
598              
599             The C<$partitions> must be a non-negative integers.
600              
601             =item C<$time>
602              
603             Get offsets before the given time (in milliseconds since UNIX Epoch). It must
604             be a positive number. It may be a L integer on 32
605             bit system.
606              
607             The special values C<$RECEIVE_LATEST_OFFSETS> (-1), C<$RECEIVE_EARLIEST_OFFSET>
608             (-2) are allowed. They can be imported from the L module.
609              
610             =item C<$max_number>
611              
612             Maximum number of offsets to be returned
613              
614             =back
615              
616             =cut
617              
618             sub offsets {
619 63     63 1 2883829 my ( $self, $topic, $partition, $time, $max_number ) = @_;
620              
621 63 100 66     190 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'time' )
      66        
      100        
622             unless defined( $time ) && ( _isbig( $time ) || isint( $time ) ) && $time >= $RECEIVE_EARLIEST_OFFSET;
623 52 100 66     1173 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'max_number (%s)', $max_number ) )
      66        
624             unless !defined( $max_number ) || ( _POSINT( $max_number ) && $max_number <= $MAX_INT32 );
625              
626 38         376 return $self->_query_offsets($topic, $partition, $time, $max_number, 0);
627             }
628              
629             sub _query_offsets {
630 38     38   82 my ( $self, $topic, $partition, $time, $max_number, $api_version ) = @_;
631              
632 38 100 66     262 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
      100        
633             unless defined( $topic) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
634 30 100 66     173 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      66        
635             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
636              
637 20         37 my $is_v1 = $api_version == 1;
638              
639             my $request = {
640             ApiKey => $APIKEY_OFFSET,
641             ApiVersion => $api_version,
642             CorrelationId => _get_CorrelationId(),
643             ClientId => $self->{ClientId},
644             topics => [
645             {
646             TopicName => $topic,
647             partitions => [
648             {
649             Partition => $partition,
650             Time => $time,
651             MaxNumberOfOffsets => $max_number // $self->{MaxNumberOfOffsets},
652             },
653 20   66     62 ],
654             },
655             ],
656             };
657              
658 20         86 my $response = $self->{Connection}->receive_response_to_request( $request );
659              
660 18         22 my $offsets = [];
661             # because we accepted only one topic and partition, we are sure that the
662             # response is all about this single topic and partition, so we can merge
663             # the offsets.
664 18 50       30 if ($is_v1) {
665 0         0 foreach my $received_topic ( @{ $response->{topics} } ) {
  0         0  
666 0         0 foreach my $partition_offsets ( @{ $received_topic->{PartitionOffsets} } ) {
  0         0  
667             push @$offsets, { timestamp => $partition_offsets->{Timestamp},
668 0         0 offset => $partition_offsets->{Offset} };
669             }
670             }
671             } else {
672 18         25 foreach my $received_topic ( @{ $response->{topics} } ) {
  18         37  
673 18         16 foreach my $partition_offsets ( @{ $received_topic->{PartitionOffsets} } ) {
  18         21  
674 18         17 push @$offsets, @{ $partition_offsets->{Offset} };
  18         40  
675             }
676             }
677             }
678              
679 18         93 return $offsets;
680             }
681              
682             =head3 C
683              
684             Commit offsets using the offset commit/fetch API.
685              
686             Returns a non-blank value (a reference to a hash with server response description)
687             if the message is successfully sent.
688              
689             C takes the following arguments:
690              
691             =over 3
692              
693             =item C<$topic>
694              
695             The C<$topic> must be a normal non-false string of non-zero length.
696              
697             =item C<$partition>
698              
699             The C<$partition> must be a non-negative integer.
700              
701             =item C<$offset>
702              
703             Offset in topic and partition to commit.
704              
705             The argument must be a positive number.
706              
707             The argument may be a L integer on 32 bit
708             system.
709              
710             =item C<$group>
711              
712             The name of the consumer group
713              
714             The argument must be a normal non-false string of non-zero length.
715              
716             =back
717              
718             =cut
719             sub commit_offsets {
720 0     0 1 0 my ( $self, $topic, $partition, $offset, $group ) = @_;
721              
722              
723 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      0        
      0        
724             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
725 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      0        
726             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
727 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'offset' )
      0        
728             unless defined( $offset ) && ( ( _isbig( $offset ) && $offset >= 0 ) || defined( _NONNEGINT( $offset ) ) );
729 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'group' )
      0        
      0        
730             unless defined( $group ) && ( $group eq q{} || defined( _STRING( $group ) ) ) && !utf8::is_utf8( $group );
731              
732             my $request = {
733             __send_to__ => 'group_coordinator',
734             ApiKey => $APIKEY_OFFSETCOMMIT,
735             CorrelationId => _get_CorrelationId(),
736             ClientId => $self->{ClientId},
737 0         0 GroupId => $group,
738             topics => [
739             {
740             TopicName => $topic,
741             partitions => [
742             {
743             Partition => $partition,
744             Offset => $offset,
745             Metadata => '',
746             },
747             ],
748             },
749             ],
750             };
751              
752 0         0 return $self->{Connection}->receive_response_to_request( $request );
753             }
754              
755             =head3 C
756              
757             Fetch Committed offsets using the offset commit/fetch API.
758              
759             Returns a non-blank value (a reference to a hash with server response description)
760             if the message is successfully sent.
761              
762             C takes the following arguments:
763              
764             =over 3
765              
766             =item C<$topic>
767              
768             The C<$topic> must be a normal non-false string of non-zero length.
769              
770             =item C<$partition>
771              
772             The C<$partition> must be a non-negative integer.
773              
774             =item C<$group>
775              
776             The name of the consumer group
777              
778             The argument must be a normal non-false string of non-zero length.
779              
780             =back
781              
782             =cut
783             sub fetch_offsets {
784 0     0 1 0 my ( $self, $topic, $partition, $group ) = @_;
785              
786              
787 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      0        
      0        
788             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
789 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      0        
790             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
791 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'group' )
      0        
      0        
792             unless defined( $group ) && ( $group eq q{} || defined( _STRING( $group ) ) ) && !utf8::is_utf8( $group );
793              
794             my $request = {
795             __send_to__ => 'group_coordinator',
796             ApiKey => $APIKEY_OFFSETFETCH,
797             CorrelationId => _get_CorrelationId(),
798             ClientId => $self->{ClientId},
799 0         0 GroupId => $group,
800             topics => [
801             {
802             TopicName => $topic,
803             partitions => [
804             {
805             Partition => $partition,
806             },
807             ],
808             },
809             ],
810             };
811              
812 0         0 return $self->{Connection}->receive_response_to_request( $request );
813             }
814              
815             #-- private attributes ---------------------------------------------------------
816              
817             #-- private methods ------------------------------------------------------------
818              
819             # Handler for errors
820             sub _error {
821 171     171   295 my $self = shift;
822              
823 171         381 Kafka::Exception::Consumer->throw( throw_args( @_ ) );
824              
825 0           return;
826             }
827              
828              
829              
830             1;
831              
832             __END__