File Coverage

lib/Kafka/Consumer.pm
Criterion Covered Total %
statement 92 125 73.6
branch 37 66 56.0
condition 80 182 43.9
subroutine 16 22 72.7
pod 9 9 100.0
total 234 404 57.9


line stmt bran cond sub pod time code
1             package Kafka::Consumer;
2              
3             =head1 NAME
4              
5             Kafka::Consumer - Perl interface for Kafka consumer client.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.06 .
10              
11             =cut
12              
13 5     5   3280 use 5.010;
  5         19  
14 5     5   79 use strict;
  5         11  
  5         111  
15 5     5   58 use warnings;
  5         10  
  5         209  
16              
17             our $VERSION = '1.06';
18              
19 5     5   25 use Carp;
  5         9  
  5         304  
20 5         280 use Params::Util qw(
21             _INSTANCE
22             _NONNEGINT
23             _NUMBER
24             _POSINT
25             _STRING
26 5     5   28 );
  5         9  
27 5         281 use Scalar::Util::Numeric qw(
28             isint
29 5     5   28 );
  5         10  
30              
31 5         790 use Kafka qw(
32             $BITS64
33             $DEFAULT_MAX_BYTES
34             $DEFAULT_MAX_NUMBER_OF_OFFSETS
35             $DEFAULT_MAX_WAIT_TIME
36             %ERROR
37             $ERROR_CANNOT_GET_METADATA
38             $ERROR_METADATA_ATTRIBUTES
39             $ERROR_MISMATCH_ARGUMENT
40             $ERROR_PARTITION_DOES_NOT_MATCH
41             $ERROR_TOPIC_DOES_NOT_MATCH
42             $MESSAGE_SIZE_OVERHEAD
43             $MIN_BYTES_RESPOND_IMMEDIATELY
44             $RECEIVE_LATEST_OFFSETS
45             $RECEIVE_EARLIEST_OFFSET
46 5     5   27 );
  5         15  
47 5     5   35 use Kafka::Exceptions;
  5         13  
  5         233  
48 5         547 use Kafka::Internals qw(
49             $APIKEY_FETCH
50             $APIKEY_OFFSET
51             $APIKEY_OFFSETCOMMIT
52             $APIKEY_OFFSETFETCH
53             $MAX_INT32
54             _get_CorrelationId
55             _isbig
56             format_message
57 5     5   27 );
  5         8  
58 5     5   29 use Kafka::Connection;
  5         11  
  5         380  
59 5     5   1160 use Kafka::Message;
  5         13  
  5         9425  
60              
61             if ( !$BITS64 ) { eval 'use Kafka::Int64; 1;' or die "Cannot load Kafka::Int64 : $@"; } ## no critic
62              
63             =head1 SYNOPSIS
64              
65             use 5.010;
66             use strict;
67             use warnings;
68              
69             use Scalar::Util qw(
70             blessed
71             );
72             use Try::Tiny;
73              
74             use Kafka qw(
75             $DEFAULT_MAX_BYTES
76             $DEFAULT_MAX_NUMBER_OF_OFFSETS
77             $RECEIVE_EARLIEST_OFFSET
78             );
79             use Kafka::Connection;
80             use Kafka::Consumer;
81              
82             my ( $connection, $consumer );
83             try {
84              
85             #-- Connection
86             $connection = Kafka::Connection->new( host => 'localhost' );
87              
88             #-- Consumer
89             $consumer = Kafka::Consumer->new( Connection => $connection );
90              
91             # Get a valid offset before the given time
92             my $offsets = $consumer->offset_before_time(
93             'mytopic', # topic
94             0, # partition
95             (time()-3600) * 1000, # time
96             );
97              
98             if ( @$offsets ) {
99             say "Received offset: $_" foreach @$offsets;
100             } else {
101             warn "Error: Offsets are not received\n";
102             }
103              
104             # Consuming messages
105             my $messages = $consumer->fetch(
106             'mytopic', # topic
107             0, # partition
108             0, # offset
109             $DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive
110             );
111              
112             if ( $messages ) {
113             foreach my $message ( @$messages ) {
114             if ( $message->valid ) {
115             say 'payload : ', $message->payload;
116             say 'key : ', $message->key;
117             say 'offset : ', $message->offset;
118             say 'next_offset: ', $message->next_offset;
119             } else {
120             say 'error : ', $message->error;
121             }
122             }
123             }
124              
125             } catch {
126             my $error = $_;
127             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
128             warn 'Error: (', $error->code, ') ', $error->message, "\n";
129             exit;
130             } else {
131             die $error;
132             }
133             };
134              
135             # Closes the consumer and cleans up
136             undef $consumer;
137             $connection->close;
138             undef $connection;
139              
140             =head1 DESCRIPTION
141              
142             Kafka consumer API is implemented by C class.
143              
144             The main features of the C class are:
145              
146             =over 3
147              
148             =item *
149              
150             Provides an object-oriented API for consuming messages.
151              
152             =item *
153              
154             Provides Kafka FETCH and OFFSETS requests.
155              
156             =item *
157              
158             Supports parsing the Apache Kafka 0.9+ Wire Format protocol.
159              
160             =item *
161              
162             Works with 64-bit elements of the Kafka Wire Format protocol
163             on 32 bit systems.
164              
165             =back
166              
167             The Kafka consumer response returns ARRAY references for C and
168             C methods.
169              
170             Array returned by C contains offset integers.
171              
172             Array returned by C contains objects of L class.
173              
174             =cut
175              
176             #-- constructor ----------------------------------------------------------------
177              
178             =head2 CONSTRUCTOR
179              
180             =head3 C
181              
182             Creates a new consumer client object. Returns the created C object.
183              
184             C takes arguments in key-value pairs. The following arguments are recognized:
185              
186             =over 3
187              
188             =item C $connection>
189              
190             C<$connection> is the L object responsible for communication with
191             the Apache Kafka cluster.
192              
193             =item C $client_id>
194              
195             This is a user supplied identifier (string) for the client application.
196              
197             C will be auto-assigned if not passed in when creating L object.
198              
199             =item C $max_time>
200              
201             The maximum amount of time (seconds, may be fractional) to wait when no sufficient data is available at the time the
202             request was issued.
203              
204             Optional, default is C<$DEFAULT_MAX_WAIT_TIME>.
205              
206             C<$DEFAULT_MAX_WAIT_TIME> is the default time that can be imported from the
207             L module.
208              
209             The C<$max_time> must be a positive number.
210              
211             =item C $min_bytes>
212              
213             The minimum number of bytes of messages that must be available to give a response.
214             If the client sets this to C<$MIN_BYTES_RESPOND_IMMEDIATELY> the server will always respond
215             immediately. If it is set to C<$MIN_BYTES_RESPOND_HAS_DATA>, the server will respond as soon
216             as at least one partition has at least 1 byte of data or the specified timeout occurs.
217             Setting higher values in combination with the bigger timeouts allows reading larger chunks of data.
218              
219             Optional, int32 signed integer, default is C<$MIN_BYTES_RESPOND_IMMEDIATELY>.
220              
221             C<$MIN_BYTES_RESPOND_IMMEDIATELY>, C<$MIN_BYTES_RESPOND_HAS_DATA> are the defaults that
222             can be imported from the L module.
223              
224             The C<$min_bytes> must be a non-negative int32 signed integer.
225              
226             =item C $max_bytes>
227              
228             The maximum bytes to include in the message set for this partition.
229              
230             Optional, int32 signed integer, default = C<$DEFAULT_MAX_BYTES> (1_000_000).
231              
232             The C<$max_bytes> must be more than C<$MESSAGE_SIZE_OVERHEAD>
233             (size of protocol overhead - data added by Kafka wire protocol to each message).
234              
235             C<$DEFAULT_MAX_BYTES>, C<$MESSAGE_SIZE_OVERHEAD>
236             are the defaults that can be imported from the L module.
237              
238             =item C $max_number>
239              
240             Limit the number of offsets returned by Kafka.
241              
242             That is a non-negative integer.
243              
244             Optional, int32 signed integer, default = C<$DEFAULT_MAX_NUMBER_OF_OFFSETS> (100).
245              
246             C<$DEFAULT_MAX_NUMBER_OF_OFFSETS>
247             is the default that can be imported from the L module.
248              
249             =back
250              
251             =cut
252             sub new {
253 179     179 1 264327 my ( $class, %p ) = @_;
254              
255 179         802 my $self = bless {
256             Connection => undef,
257             ClientId => undef,
258             MaxWaitTime => $DEFAULT_MAX_WAIT_TIME,
259             MinBytes => $MIN_BYTES_RESPOND_IMMEDIATELY,
260             MaxBytes => $DEFAULT_MAX_BYTES,
261             MaxNumberOfOffsets => $DEFAULT_MAX_NUMBER_OF_OFFSETS,
262             }, $class;
263              
264 179   66     1553 exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
265              
266 179   100     897 $self->{ClientId} //= 'consumer';
267              
268             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'Connection' )
269 179 100       1321 unless _INSTANCE( $self->{Connection}, 'Kafka::Connection' );
270             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'ClientId' )
271 159 100 66     1134 unless ( $self->{ClientId} eq q{} || defined( _STRING( $self->{ClientId} ) ) ) && !utf8::is_utf8( $self->{ClientId} );
      100        
272             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MaxWaitTime (%s)', $self->{MaxWaitTime} ) )
273 151 100 100     1170 unless defined( $self->{MaxWaitTime} ) && defined _NUMBER( $self->{MaxWaitTime} ) && int( $self->{MaxWaitTime} * 1000 ) >= 1 && int( $self->{MaxWaitTime} * 1000 ) <= $MAX_INT32;
      100        
      66        
274             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MinBytes (%s)', $self->{MinBytes} ) )
275 139 50 66     444 unless ( _isbig( $self->{MinBytes} ) ? ( $self->{MinBytes} >= 0 ) : defined( _NONNEGINT( $self->{MinBytes} ) ) ) && $self->{MinBytes} <= $MAX_INT32;
    100          
276             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MaxBytes (%s)', $self->{MaxBytes} ) )
277 126 50 100     1521 unless ( _isbig( $self->{MaxBytes} ) ? ( $self->{MaxBytes} > 0 ) : _POSINT( $self->{MaxBytes} ) ) && $self->{MaxBytes} >= $MESSAGE_SIZE_OVERHEAD && $self->{MaxBytes} <= $MAX_INT32;
    100 66        
278             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MaxNumberOfOffsets (%s)', $self->{MaxNumberOfOffsets} ) )
279 110 100 66     3313 unless defined( _POSINT( $self->{MaxNumberOfOffsets} ) ) && $self->{MaxNumberOfOffsets} <= $MAX_INT32;
280              
281 95         1174 return $self;
282             }
283              
284             #-- public attributes ----------------------------------------------------------
285              
286             =head2 METHODS
287              
288             The following methods are defined for the C class:
289              
290             =cut
291              
292             #-- public methods -------------------------------------------------------------
293              
294             =head3 C
295              
296             Get a list of messages to consume one by one up to C<$max_size> bytes.
297              
298             Returns the reference to array of the L objects.
299              
300             C takes the following arguments:
301              
302             =over 3
303              
304             =item C<$topic>
305              
306             The C<$topic> must be a normal non-false string of non-zero length.
307              
308             =item C<$partition>
309              
310             The C<$partition> must be a non-negative integer.
311              
312             =item C<$start_offset>
313              
314             Offset in topic and partition to start from (64-bit integer).
315              
316             The argument must be a non-negative integer. The argument may be a
317             L integer on 32-bit system.
318              
319             =item C<$max_size>
320              
321             C<$max_size> is the maximum size of the messages set to return. The argument
322             must be a positive int32 signed integer.
323              
324             The maximum size of a request limited by C that
325             can be imported from L module.
326              
327             =back
328              
329             =cut
330             sub fetch {
331 5055     5055 1 177778 my ( $self, $topic, $partition, $start_offset, $max_size, $_return_all, $api_version ) = @_;
332             # Special argument: $_return_all - return redundant messages sent out of a compressed package posts
333              
334 5055 100 66     39064 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
      100        
335             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
336 5047 100 66     26600 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      66        
337             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
338 5037 100 66     14850 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'offset' )
      100        
339             unless defined( $start_offset ) && ( ( _isbig( $start_offset ) && $start_offset >= 0 ) || defined( _NONNEGINT( $start_offset ) ) );
340 5024 100 66     52906 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'max_size (%s)', $max_size ) )
      100        
      66        
      66        
341             unless ( !defined( $max_size ) || ( ( _isbig( $max_size ) || _POSINT( $max_size ) ) && $max_size >= $MESSAGE_SIZE_OVERHEAD && $max_size <= $MAX_INT32 ) );
342              
343             my $request = {
344             ApiKey => $APIKEY_FETCH,
345             ApiVersion => $api_version,
346             CorrelationId => _get_CorrelationId(),
347             ClientId => $self->{ClientId},
348             MaxWaitTime => int( $self->{MaxWaitTime} * 1000 ),
349             MinBytes => $self->{MinBytes},
350             MaxBytes => $max_size // $self->{MaxBytes},
351             topics => [
352             {
353             TopicName => $topic,
354             partitions => [
355             {
356             Partition => $partition,
357             FetchOffset => $start_offset,
358             MaxBytes => $max_size // $self->{MaxBytes},
359             },
360 5011   66     56957 ],
      66        
361             },
362             ],
363             };
364              
365 5011         17453 my $response = $self->{Connection}->receive_response_to_request( $request, undef, $self->{MaxWaitTime} );
366              
367 5009         8046 my $messages = [];
368 5009         6271 foreach my $received_topic ( @{ $response->{topics} } ) {
  5009         9125  
369             $received_topic->{TopicName} eq $topic
370 5009 50       9982 or $self->_error( $ERROR_TOPIC_DOES_NOT_MATCH, format_message( "'%s' ne '%s'", $topic, $received_topic->{TopicName} ) );
371 5009         5397 foreach my $received_partition ( @{ $received_topic->{partitions} } ) {
  5009         7432  
372             $received_partition->{Partition} == $partition
373 5009 50       9232 or $self->_error( $ERROR_PARTITION_DOES_NOT_MATCH, format_message( '%s != %s', $partition, $received_partition->{Partition} ) );
374 5009         5743 my $HighwaterMarkOffset = $received_partition->{HighwaterMarkOffset};
375 5009         5366 foreach my $Message ( @{ $received_partition->{MessageSet} } ) {
  5009         7543  
376 15018         18632 my $offset = $Message->{Offset};
377 15018         14188 my $next_offset;
378 15018 50       19505 if ( $BITS64 ) {
379 15018         18588 $next_offset += $offset + 1;
380             } else {
381 0         0 $offset = Kafka::Int64::intsum( $offset, 0 );
382 0         0 $next_offset = Kafka::Int64::intsum( $offset, 1 );
383             }
384              
385             # skip previous messages of a compressed package posts
386 15018 50 33     24615 next if $offset < $start_offset && !$_return_all;
387              
388 15018         16251 my $message_error = q{};
389             # According to Apache Kafka documentation:
390             # This byte holds metadata attributes about the message. The
391             # lowest 3 bits contain the compression codec used for the
392             # message. The fourth lowest bit represents the timestamp type.
393             # 0 stands for CreateTime and 1 stands for LogAppendTime. The
394             # producer should always set this bit to 0. (since 0.10.0).
395             # All other bits should be set to 0.
396 15018         16033 my $attributes = $Message->{Attributes};
397             # check that attributes is valid
398             $attributes & 0b11110000
399 15018 50       22062 and $message_error = $ERROR{ $ERROR_METADATA_ATTRIBUTES };
400              
401 15018 50       19232 if (my $compression_codec = $attributes & 0b00000111) {
402 0 0 0     0 unless ( $compression_codec == 1 # GZIP
403             || $compression_codec == 2 # Snappy
404             ) {
405 0         0 $message_error = $ERROR{ $ERROR_METADATA_ATTRIBUTES };
406             }
407             }
408              
409             push( @$messages, Kafka::Message->new( {
410             Attributes => $Message->{Attributes},
411             Timestamp => $Message->{Timestamp},
412             MagicByte => $Message->{MagicByte},
413             key => $Message->{Key},
414             payload => $Message->{Value},
415 15018         88077 offset => $offset,
416             next_offset => $next_offset,
417             error => $message_error,
418             valid => !$message_error,
419             HighwaterMarkOffset => $HighwaterMarkOffset,
420             } )
421             );
422             }
423             }
424             }
425              
426 5009         38346 return $messages;
427             }
428              
429             =head3 C
430              
431             Returns an offset, given a topic, partition and time.
432              
433             The returned offset is the earliest offset whose timestamp is greater than or
434             equal to the given timestamp. The return value is a HashRef, containing
435             C and C keys.
436              
437             B: this method requires Kafka 0.10.0, and messages with timestamps.
438             Check the configuration of the brokers or topic, specifically
439             C, and set it either to C to have Kafka
440             automatically set messages timestamps based on the broker clock, or
441             C, in which case the client populating your topic has to set the
442             timestamps when producing messages. .
443              
444             C takes the following arguments:
445              
446             =over 3
447              
448             =item C<$topic>
449              
450             The C<$topics> must be a normal non-false strings of non-zero length.
451              
452             =item C<$partition>
453              
454             The C<$partitions> must be a non-negative integers.
455              
456             =item C<$time>
457              
458             Get offsets before the given time (in milliseconds since UNIX Epoch).
459              
460             The argument must be a positive number.
461              
462             The argument may be a L integer on 32 bit
463             system.
464              
465             =back
466              
467             =cut
468              
469             sub offset_at_time {
470 0     0 1 0 my ( $self, $topic, $partition, $time ) = @_;
471              
472             # we don't accept special values for $time, we want a real timestamp
473 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'time' )
      0        
      0        
474             unless defined( $time ) && ( _isbig( $time ) || isint( $time ) ) && $time >= 0;
475              
476             # no max_number, api version = 1
477 0         0 return $self->_query_offsets($topic, $partition, $time, undef, 1)->[0];
478             }
479              
480             =head3 C
481              
482             Returns an offset, given a topic, partition and time.
483              
484             The returned offset is an offset whose timestamp is guaranteed to be earlier
485             than the given timestamp. The return value is a Number
486              
487             This method works with all version of Kafka, and doesn't require messages with
488             timestamps.
489              
490             C takes the following arguments:
491              
492             =over 3
493              
494             =item C<$topic>
495              
496             The C<$topics> must be a normal non-false strings of non-zero length.
497              
498             =item C<$partition>
499              
500             The C<$partitions> must be a non-negative integers.
501              
502             =item C<$time>
503              
504             Get offsets before the given time (in milliseconds since UNIX Epoch).
505              
506             The argument must be a positive number.
507              
508             The argument may be a L integer on 32 bit
509             system.
510              
511             =back
512              
513             =cut
514              
515             sub offset_before_time {
516 0     0 1 0 my ( $self, $topic, $partition, $time ) = @_;
517              
518             # we don't accept special values for $time, we want a real timestamp
519 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'time' )
      0        
      0        
520             unless defined( $time ) && ( _isbig( $time ) || isint( $time ) ) && $time >= 0;
521             # $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'max_number (%s)', $max_number ) )
522             # unless !defined( $max_number ) || ( _POSINT( $max_number ) && $max_number <= $MAX_INT32 );
523              
524             # max_number = 1, api version = 0
525 0         0 return $self->_query_offsets($topic, $partition, $time, 1, 0)->[0];
526             }
527              
528             =head3 C
529              
530             Returns the earliest offset for a given topic and partition
531              
532             C takes the following arguments:
533              
534             =over 3
535              
536             =item C<$topic>
537              
538             The C<$topics> must be a normal non-false strings of non-zero length.
539              
540             =item C<$partition>
541              
542             The C<$partitions> must be a non-negative integers.
543              
544             =back
545              
546             =cut
547              
548             sub offset_earliest {
549 0     0 1 0 my ( $self, $topic, $partition ) = @_;
550              
551             # max_number = 1, api version = 0
552 0         0 return $self->_query_offsets($topic, $partition, $RECEIVE_EARLIEST_OFFSET, 1, 0)->[0];
553             }
554              
555             =head3 C
556              
557             Returns the latest offset for a given topic and partition
558              
559             C takes the following arguments:
560              
561             =over 3
562              
563             =item C<$topic>
564              
565             The C<$topics> must be a normal non-false strings of non-zero length.
566              
567             =item C<$partition>
568              
569             The C<$partitions> must be a non-negative integers.
570              
571             =back
572              
573             =cut
574              
575             sub offset_latest {
576 0     0 1 0 my ( $self, $topic, $partition ) = @_;
577              
578             # max_number = 1, api version = 0
579 0         0 return $self->_query_offsets($topic, $partition, $RECEIVE_LATEST_OFFSETS, 1, 0)->[0];
580             }
581              
582             =head3 C
583              
584             B: This method is DEPRECATED, please use one of C, C, C, C. It is kept for backward compatibility.
585              
586             Returns an ArrayRef of offsets
587              
588             C takes the following arguments:
589              
590             =over 3
591              
592             =item C<$topic>
593              
594             The C<$topics> must be a normal non-false strings of non-zero length.
595              
596             =item C<$partition>
597              
598             The C<$partitions> must be a non-negative integers.
599              
600             =item C<$time>
601              
602             Get offsets before the given time (in milliseconds since UNIX Epoch). It must
603             be a positive number. It may be a L integer on 32
604             bit system.
605              
606             The special values C<$RECEIVE_LATEST_OFFSETS> (-1), C<$RECEIVE_EARLIEST_OFFSET>
607             (-2) are allowed. They can be imported from the L module.
608              
609             =item C<$max_number>
610              
611             Maximum number of offsets to be returned
612              
613             =back
614              
615             =cut
616              
617             sub offsets {
618 63     63 1 3341851 my ( $self, $topic, $partition, $time, $max_number ) = @_;
619              
620 63 100 66     289 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'time' )
      66        
      100        
621             unless defined( $time ) && ( _isbig( $time ) || isint( $time ) ) && $time >= $RECEIVE_EARLIEST_OFFSET;
622 52 100 66     1509 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'max_number (%s)', $max_number ) )
      66        
623             unless !defined( $max_number ) || ( _POSINT( $max_number ) && $max_number <= $MAX_INT32 );
624              
625 38         471 return $self->_query_offsets($topic, $partition, $time, $max_number, 0);
626             }
627              
628             sub _query_offsets {
629 38     38   107 my ( $self, $topic, $partition, $time, $max_number, $api_version ) = @_;
630              
631 38 100 66     315 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
      100        
632             unless defined( $topic) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
633 30 100 66     200 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      66        
634             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
635              
636 20         40 my $is_v1 = $api_version == 1;
637              
638             my $request = {
639             ApiKey => $APIKEY_OFFSET,
640             ApiVersion => $api_version,
641             CorrelationId => _get_CorrelationId(),
642             ClientId => $self->{ClientId},
643             topics => [
644             {
645             TopicName => $topic,
646             partitions => [
647             {
648             Partition => $partition,
649             Time => $time,
650             MaxNumberOfOffsets => $max_number // $self->{MaxNumberOfOffsets},
651             },
652 20   66     67 ],
653             },
654             ],
655             };
656              
657 20         87 my $response = $self->{Connection}->receive_response_to_request( $request );
658              
659 18         34 my $offsets = [];
660             # because we accepted only one topic and partition, we are sure that the
661             # response is all about this single topic and partition, so we can merge
662             # the offsets.
663 18 50       39 if ($is_v1) {
664 0         0 foreach my $received_topic ( @{ $response->{topics} } ) {
  0         0  
665 0         0 foreach my $partition_offsets ( @{ $received_topic->{PartitionOffsets} } ) {
  0         0  
666             push @$offsets, { timestamp => $partition_offsets->{Timestamp},
667 0         0 offset => $partition_offsets->{Offset} };
668             }
669             }
670             } else {
671 18         20 foreach my $received_topic ( @{ $response->{topics} } ) {
  18         38  
672 18         21 foreach my $partition_offsets ( @{ $received_topic->{PartitionOffsets} } ) {
  18         30  
673 18         22 push @$offsets, @{ $partition_offsets->{Offset} };
  18         63  
674             }
675             }
676             }
677              
678 18         102 return $offsets;
679             }
680              
681             =head3 C
682              
683             Commit offsets using the offset commit/fetch API.
684              
685             Returns a non-blank value (a reference to a hash with server response description)
686             if the message is successfully sent.
687              
688             C takes the following arguments:
689              
690             =over 3
691              
692             =item C<$topic>
693              
694             The C<$topic> must be a normal non-false string of non-zero length.
695              
696             =item C<$partition>
697              
698             The C<$partition> must be a non-negative integer.
699              
700             =item C<$offset>
701              
702             Offset in topic and partition to commit.
703              
704             The argument must be a positive number.
705              
706             The argument may be a L integer on 32 bit
707             system.
708              
709             =item C<$group>
710              
711             The name of the consumer group
712              
713             The argument must be a normal non-false string of non-zero length.
714              
715             =back
716              
717             =cut
718             sub commit_offsets {
719 0     0 1 0 my ( $self, $topic, $partition, $offset, $group ) = @_;
720              
721              
722 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      0        
      0        
723             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
724 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      0        
725             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
726 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'offset' )
      0        
727             unless defined( $offset ) && ( ( _isbig( $offset ) && $offset >= 0 ) || defined( _NONNEGINT( $offset ) ) );
728 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'group' )
      0        
      0        
729             unless defined( $group ) && ( $group eq q{} || defined( _STRING( $group ) ) ) && !utf8::is_utf8( $group );
730              
731             my $request = {
732             __send_to__ => 'group_coordinator',
733             ApiKey => $APIKEY_OFFSETCOMMIT,
734             CorrelationId => _get_CorrelationId(),
735             ClientId => $self->{ClientId},
736 0         0 GroupId => $group,
737             topics => [
738             {
739             TopicName => $topic,
740             partitions => [
741             {
742             Partition => $partition,
743             Offset => $offset,
744             Metadata => '',
745             },
746             ],
747             },
748             ],
749             };
750              
751 0         0 return $self->{Connection}->receive_response_to_request( $request );
752             }
753              
754             =head3 C
755              
756             Fetch Committed offsets using the offset commit/fetch API.
757              
758             Returns a non-blank value (a reference to a hash with server response description)
759             if the message is successfully sent.
760              
761             C takes the following arguments:
762              
763             =over 3
764              
765             =item C<$topic>
766              
767             The C<$topic> must be a normal non-false string of non-zero length.
768              
769             =item C<$partition>
770              
771             The C<$partition> must be a non-negative integer.
772              
773             =item C<$group>
774              
775             The name of the consumer group
776              
777             The argument must be a normal non-false string of non-zero length.
778              
779             =back
780              
781             =cut
782             sub fetch_offsets {
783 0     0 1 0 my ( $self, $topic, $partition, $group ) = @_;
784              
785              
786 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      0        
      0        
787             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
788 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      0        
789             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
790 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'group' )
      0        
      0        
791             unless defined( $group ) && ( $group eq q{} || defined( _STRING( $group ) ) ) && !utf8::is_utf8( $group );
792              
793             my $request = {
794             __send_to__ => 'group_coordinator',
795             ApiKey => $APIKEY_OFFSETFETCH,
796             CorrelationId => _get_CorrelationId(),
797             ClientId => $self->{ClientId},
798 0         0 GroupId => $group,
799             topics => [
800             {
801             TopicName => $topic,
802             partitions => [
803             {
804             Partition => $partition,
805             },
806             ],
807             },
808             ],
809             };
810              
811 0         0 return $self->{Connection}->receive_response_to_request( $request );
812             }
813              
814             #-- private attributes ---------------------------------------------------------
815              
816             #-- private methods ------------------------------------------------------------
817              
818             # Handler for errors
819             sub _error {
820 171     171   345 my $self = shift;
821              
822 171         462 Kafka::Exception::Consumer->throw( throw_args( @_ ) );
823              
824 0           return;
825             }
826              
827              
828              
829             1;
830              
831             __END__