File Coverage

lib/Kafka/Connection.pm
Criterion Covered Total %
statement 363 472 76.9
branch 139 250 55.6
condition 74 155 47.7
subroutine 54 60 90.0
pod 11 11 100.0
total 641 948 67.6


line stmt bran cond sub pod time code
1             package Kafka::Connection;
2              
3             =head1 NAME
4              
5             Kafka::Connection - Object interface to connect to a kafka cluster.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.06 .
10              
11             =cut
12              
13 8     8   132731 use 5.010;
  8         31  
14 8     8   45 use strict;
  8         14  
  8         190  
15 8     8   47 use warnings;
  8         18  
  8         510  
16              
17             our $DEBUG = 0;
18              
19             our $VERSION = '1.06';
20              
21 8         388 use Exporter qw(
22             import
23 8     8   46 );
  8         16  
24             our @EXPORT = qw(
25             %RETRY_ON_ERRORS
26             );
27              
28 8         458 use Data::Validate::Domain qw(
29             is_hostname
30 8     8   2096 );
  8         55749  
31 8         569 use Data::Validate::IP qw(
32             is_ipv4
33             is_ipv6
34 8     8   2616 );
  8         163545  
35 8     8   717 use Const::Fast;
  8         3071  
  8         72  
36 8         632 use List::Util qw(
37             shuffle
38 8     8   602 );
  8         20  
39 8         638 use Params::Util qw(
40             _ARRAY
41             _ARRAY0
42             _HASH
43             _NONNEGINT
44             _NUMBER
45             _POSINT
46             _STRING
47 8     8   1821 );
  8         8027  
48 8         377 use Scalar::Util qw(
49             blessed
50 8     8   62 );
  8         25  
51 8         460 use Scalar::Util::Numeric qw(
52             isint
53 8     8   3410 );
  8         4475  
54 8         295 use Storable qw(
55             dclone
56 8     8   54 );
  8         20  
57 8     8   1643 use Time::HiRes ();
  8         3887  
  8         165  
58 8     8   2353 use Try::Tiny;
  8         8636  
  8         733  
59              
60 8         3009 use Kafka qw(
61             %ERROR
62              
63             $ERROR_NO_ERROR
64             $ERROR_UNKNOWN
65             $ERROR_OFFSET_OUT_OF_RANGE
66             $ERROR_INVALID_MESSAGE
67             $ERROR_UNKNOWN_TOPIC_OR_PARTITION
68             $ERROR_INVALID_FETCH_SIZE
69             $ERROR_LEADER_NOT_AVAILABLE
70             $ERROR_NOT_LEADER_FOR_PARTITION
71             $ERROR_REQUEST_TIMED_OUT
72             $ERROR_BROKER_NOT_AVAILABLE
73             $ERROR_REPLICA_NOT_AVAILABLE
74             $ERROR_MESSAGE_TOO_LARGE
75             $ERROR_STALE_CONTROLLER_EPOCH
76             $ERROR_NETWORK_EXCEPTION
77             $ERROR_GROUP_LOAD_IN_PROGRESS
78             $ERROR_OFFSET_METADATA_TOO_LARGE
79             $ERROR_GROUP_COORDINATOR_NOT_AVAILABLE
80             $ERROR_NOT_COORDINATOR_FOR_GROUP
81             $ERROR_NOT_ENOUGH_REPLICAS
82             $ERROR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
83             $ERROR_REBALANCE_IN_PROGRESS
84             $ERROR_UNSUPPORTED_VERSION
85              
86             $ERROR_CANNOT_BIND
87             $ERROR_CANNOT_GET_METADATA
88             $ERROR_CANNOT_RECV
89             $ERROR_CANNOT_SEND
90             $ERROR_LEADER_NOT_FOUND
91             $ERROR_GROUP_COORDINATOR_NOT_FOUND
92             $ERROR_MISMATCH_ARGUMENT
93             $ERROR_MISMATCH_CORRELATIONID
94             $ERROR_NO_KNOWN_BROKERS
95             $ERROR_RESPONSEMESSAGE_NOT_RECEIVED
96             $ERROR_SEND_NO_ACK
97             $ERROR_UNKNOWN_APIKEY
98             $ERROR_INCOMPATIBLE_HOST_IP_VERSION
99             $ERROR_NO_CONNECTION
100              
101             $IP_V4
102             $IP_V6
103             $KAFKA_SERVER_PORT
104             $NOT_SEND_ANY_RESPONSE
105             $REQUEST_TIMEOUT
106             $RETRY_BACKOFF
107             $SEND_MAX_ATTEMPTS
108 8     8   514 );
  8         21  
109              
110 8     8   1410 use Kafka::Exceptions;
  8         22  
  8         476  
111 8         1183 use Kafka::Internals qw(
112             $APIKEY_FETCH
113             $APIKEY_METADATA
114             $APIKEY_OFFSET
115             $APIKEY_PRODUCE
116             $APIKEY_FINDCOORDINATOR
117             $APIKEY_APIVERSIONS
118             $APIKEY_OFFSETCOMMIT
119             $APIKEY_OFFSETFETCH
120             $MAX_CORRELATIONID
121             $MAX_INT32
122             debug_level
123             _get_CorrelationId
124             format_message
125 8     8   57 );
  8         15  
126 8     8   1573 use Kafka::IO;
  8         21  
  8         371  
127 8         47079 use Kafka::Protocol qw(
128             $BAD_OFFSET
129             $IMPLEMENTED_APIVERSIONS
130             decode_fetch_response
131             decode_metadata_response
132             decode_offset_response
133             decode_produce_response
134             decode_api_versions_response
135             decode_find_coordinator_response
136             decode_offsetcommit_response
137             decode_offsetfetch_response
138             encode_fetch_request
139             encode_metadata_request
140             encode_offset_request
141             encode_produce_request
142             encode_api_versions_request
143             encode_find_coordinator_request
144             encode_offsetcommit_request
145             encode_offsetfetch_request
146 8     8   3982 );
  8         30  
147              
148             =head1 SYNOPSIS
149              
150             use 5.010;
151             use strict;
152             use warnings;
153              
154             use Scalar::Util qw(
155             blessed
156             );
157             use Try::Tiny;
158              
159             # A simple example of Kafka::Connection usage:
160             use Kafka::Connection;
161              
162             # connect to local cluster with the defaults
163             my $connection;
164             try {
165             $connection = Kafka::Connection->new( host => 'localhost' );
166             } catch {
167             my $error = $_;
168             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
169             warn $error->message, "\n", $error->trace->as_string, "\n";
170             exit;
171             } else {
172             die $error;
173             }
174             };
175              
176             # Closes the connection and cleans up
177             $connection->close;
178             undef $connection;
179              
180             =head1 DESCRIPTION
181              
182             The main features of the C class are:
183              
184             =over 3
185              
186             =item *
187              
188             Provides API for communication with Kafka 0.9+ cluster.
189              
190             =item *
191              
192             Performs requests encoding and responses decoding, provides automatic
193             selection or promotion of a leader server from Kafka cluster.
194              
195             =item *
196              
197             Provides information about Kafka cluster.
198              
199             =back
200              
201             =cut
202              
203             my %protocol = (
204             "$APIKEY_PRODUCE" => {
205             decode => \&decode_produce_response,
206             encode => \&encode_produce_request,
207             },
208             "$APIKEY_FETCH" => {
209             decode => \&decode_fetch_response,
210             encode => \&encode_fetch_request,
211             },
212             "$APIKEY_OFFSET" => {
213             decode => \&decode_offset_response,
214             encode => \&encode_offset_request,
215             },
216             "$APIKEY_METADATA" => {
217             decode => \&decode_metadata_response,
218             encode => \&encode_metadata_request,
219             },
220             "$APIKEY_APIVERSIONS" => {
221             decode => \&decode_api_versions_response,
222             encode => \&encode_api_versions_request,
223             },
224             "$APIKEY_FINDCOORDINATOR" => {
225             decode => \&decode_find_coordinator_response,
226             encode => \&encode_find_coordinator_request,
227             },
228             "$APIKEY_OFFSETCOMMIT" => {
229             decode => \&decode_offsetcommit_response,
230             encode => \&encode_offsetcommit_request,
231             },
232             "$APIKEY_OFFSETFETCH" => {
233             decode => \&decode_offsetfetch_response,
234             encode => \&encode_offsetfetch_request,
235             },
236             );
237              
238             =head2 EXPORT
239              
240             The following constants are available for export
241              
242             =cut
243              
244             =head3 C<%RETRY_ON_ERRORS>
245              
246             These are non-fatal errors, which when happen causes refreshing of meta-data from Kafka followed by
247             another attempt to fetch data.
248              
249             =cut
250             # When any of the following error happens, a possible change in meta-data on server is expected.
251             const our %RETRY_ON_ERRORS => (
252             # $ERROR_NO_ERROR => 1, # 0 - No error
253             $ERROR_UNKNOWN => 1, # -1 - An unexpected server error
254             # $ERROR_OFFSET_OUT_OF_RANGE => 1, # 1 - The requested offset is not within the range of offsets maintained by the server
255             $ERROR_INVALID_MESSAGE => 1, # 2 - Retriable - This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt
256             $ERROR_UNKNOWN_TOPIC_OR_PARTITION => 1, # 3 - Retriable - This server does not host this topic-partition
257             # $ERROR_INVALID_FETCH_SIZE => 1, # 4 - The requested fetch size is invalid
258             $ERROR_LEADER_NOT_AVAILABLE => 1, # 5 - Retriable - Unable to write due to ongoing Kafka leader selection
259             $ERROR_NOT_LEADER_FOR_PARTITION => 1, # 6 - Retriable - Server is not a leader for partition
260             $ERROR_REQUEST_TIMED_OUT => 1, # 7 - Retriable - Request time-out
261             $ERROR_BROKER_NOT_AVAILABLE => 1, # 8 - Broker is not available
262             $ERROR_REPLICA_NOT_AVAILABLE => 1, # 9 - Replica not available
263             # $ERROR_MESSAGE_TOO_LARGE => 1, # 10 - The request included a message larger than the max message size the server will accept
264             $ERROR_STALE_CONTROLLER_EPOCH => 1, # 11 - The controller moved to another broker
265             # $ERROR_OFFSET_METADATA_TOO_LARGE => 1, # 12 - The metadata field of the offset request was too large
266             $ERROR_NETWORK_EXCEPTION => 1, # 13 Retriable - The server disconnected before a response was received
267             $ERROR_GROUP_LOAD_IN_PROGRESS => 1, # 14 - Retriable - The coordinator is loading and hence can't process requests for this group
268             $ERROR_GROUP_COORDINATOR_NOT_AVAILABLE => 1, # 15 - Retriable - The group coordinator is not available
269             $ERROR_NOT_COORDINATOR_FOR_GROUP => 1, # 16 - Retriable - This is not the correct coordinator for this group
270              
271             # $ERROR_INVALID_TOPIC_EXCEPTION => 1, # 17 - The request attempted to perform an operation on an invalid topic
272             # $ERROR_RECORD_LIST_TOO_LARGE => 1, # 18 - The request included message batch larger than the configured segment size on the server
273             $ERROR_NOT_ENOUGH_REPLICAS => 1, # 19 - Retriable - Messages are rejected since there are fewer in-sync replicas than required
274             $ERROR_NOT_ENOUGH_REPLICAS_AFTER_APPEND => 1, # 20 - Retriable - Messages are written to the log, but to fewer in-sync replicas than required
275             # $ERROR_INVALID_REQUIRED_ACKS => 1, # 21 - Produce request specified an invalid value for required acks
276             # $ERROR_ILLEGAL_GENERATION => 1, # 22 - Specified group generation id is not valid
277             # $ERROR_INCONSISTENT_GROUP_PROTOCOL => 1, # 23 - The group member's supported protocols are incompatible with those of existing members
278             # $ERROR_INVALID_GROUP_ID => 1, # 24 - The configured groupId is invalid
279             # $ERROR_UNKNOWN_MEMBER_ID => 1, # 25 - The coordinator is not aware of this member
280             # $ERROR_INVALID_SESSION_TIMEOUT => 1, # 26 - The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)
281             $ERROR_REBALANCE_IN_PROGRESS => 1, # 27 - The group is rebalancing, so a rejoin is needed
282             # $ERROR_INVALID_COMMIT_OFFSET_SIZE => 1, # 28 - The committing offset data size is not valid
283             # $ERROR_TOPIC_AUTHORIZATION_FAILED => 1, # 29 - Not authorized to access topics: Topic authorization failed
284             # $ERROR_GROUP_AUTHORIZATION_FAILED => 1, # 30 - Not authorized to access group: Group authorization failed
285             # $ERROR_CLUSTER_AUTHORIZATION_FAILED => 1, # 31 - Cluster authorization failed
286             # $ERROR_INVALID_TIMESTAMP => 1, # 32 - The timestamp of the message is out of acceptable range
287             # $ERROR_UNSUPPORTED_SASL_MECHANISM => 1, # 33 - The broker does not support the requested SASL mechanism
288             # $ERROR_ILLEGAL_SASL_STATE => 1, # 34 - Request is not valid given the current SASL state
289             # $ERROR_UNSUPPORTED_VERSION => 1, # 35 - The version of API is not supported
290             $ERROR_NO_CONNECTION => 1, # may be disconnected due to idle timeout etc.
291             );
292              
293             #-- constructor ----------------------------------------------------------------
294              
295             =head2 CONSTRUCTOR
296              
297             =head3 C
298              
299             Creates C object for interaction with Kafka cluster.
300             Returns created C object.
301              
302             C takes arguments in key-value pairs. The following arguments are currently recognized:
303              
304             =over 3
305              
306             =item C $host>
307              
308             C<$host> is any Apache Kafka cluster host to connect to. It can be a hostname or the
309             IP-address in the "xx.xx.xx.xx" form.
310              
311             Optional. Either C or C must be supplied.
312              
313             WARNING:
314              
315             Make sure that you always connect to brokers using EXACTLY the same address or host name
316             as specified in broker configuration (host.name in server.properties).
317             Avoid using default value (when host.name is commented out) in server.properties - always use explicit value instead.
318              
319             =item C $port>
320              
321             Optional, default = C<$KAFKA_SERVER_PORT>.
322              
323             C<$port> is the attribute denoting the port number of the service we want to
324             access (Apache Kafka service). C<$port> should be an integer number.
325              
326             C<$KAFKA_SERVER_PORT> is the default Apache Kafka server port constant (C<9092>) that can
327             be imported from the L module.
328              
329             =item C $broker_list>
330              
331             Optional, C<$broker_list> is a reference to array of the host:port or [IPv6_host]:port strings, defining the list
332             of Kafka servers. This list will be used to locate the new leader if the server specified
333             via C $host> and C $port> arguments becomes unavailable. Either C
334             or C must be supplied.
335              
336             =item C $ip_version>
337              
338             Specify version of IP for interpreting of passed IP address and resolving of host name.
339              
340             Optional, undefined by default, which works in the following way: version of IP address
341             is detected automatically, host name is resolved into IPv4 address.
342              
343             See description of L<$IP_V4|Kafka::IO/$IP_V4>, L<$IP_V6|Kafka::IO/$IP_V6>
344             in C L.
345              
346             =item C $timeout>
347              
348             Optional, default = C<$Kafka::REQUEST_TIMEOUT>.
349              
350             C<$timeout> specifies how long we wait for the remote server to respond.
351             C<$timeout> is in seconds, could be a positive integer or a floating-point number not bigger than int32 positive integer.
352              
353             Special behavior when C is set to C:
354              
355             =back
356              
357             =over 3
358              
359             =item *
360              
361             Alarms are not used internally (namely when performing C).
362              
363             =item *
364              
365             Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.
366              
367             =back
368              
369             =over 3
370              
371             =item C $attempts>
372              
373             Optional, int32 signed integer, default = C<$Kafka::SEND_MAX_ATTEMPTS> .
374              
375             In some circumstances (leader is temporarily unavailable, outdated metadata, etc) we may fail to send a message.
376             This property specifies the maximum number of attempts to send a message.
377             The C<$attempts> should be an integer number.
378              
379             =item C $backoff>
380              
381             Optional, default = C<$Kafka::RETRY_BACKOFF> .
382              
383             Since leader election takes a bit of time, this property specifies the amount of time,
384             in milliseconds, that the producer waits before refreshing the metadata.
385             The C<$backoff> should be an integer number.
386              
387             =item C $mode>
388              
389             Optional, default value is 0 (false).
390              
391             Kafka BUG "[KAFKA-1124]" (Fixed in Kafka 0.8.2):
392             I controls how this module handles the first access to non-existent topic
393             when C in server configuration is C.
394             If I is false (default),
395             the first access to non-existent topic produces an exception;
396             however, the topic is created and next attempts to access it will succeed.
397              
398             If I is true, this module waits
399             (according to the C and C properties)
400             until the topic is created,
401             to avoid errors on the first access to non-existent topic.
402              
403             If C in server configuration is C, this setting has no effect.
404              
405             =item C $number>
406              
407             Optional, default value is 100.
408              
409             Defines maximum number of last non-fatal errors that we keep in log. Use method L to
410             access those errors.
411              
412             =item C $boolean>
413              
414             Optional, default value is 0 (false).
415              
416             If set to false, when communicating with a broker, the client will
417             automatically try to find out the best version numbers to use for each of the
418             API endpoints.
419              
420             If set to true, the client will always use
421             C<$Kafka::Protocol::DEFAULT_APIVERSION> as API version.
422              
423             WARNING: API versions are supported starting from Kafka 0.10. Set this parameter to true
424             if you're connecting to 0.9.
425              
426             =back
427              
428             =cut
429             sub new {
430 233     233 1 172433 my ( $class, %p ) = @_;
431              
432 233         2151 my $self = bless {
433             host => q{},
434             port => $KAFKA_SERVER_PORT,
435             broker_list => [],
436             timeout => $REQUEST_TIMEOUT,
437             ip_version => undef,
438             SEND_MAX_ATTEMPTS => $SEND_MAX_ATTEMPTS,
439             RETRY_BACKOFF => $RETRY_BACKOFF,
440             AutoCreateTopicsEnable => 0,
441             MaxLoggedErrors => 100,
442             dont_load_supported_api_versions => 0,
443             }, $class;
444              
445 233   66     3051 exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
446              
447             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'host' )
448 233 100 100     2728 unless defined( $self->{host} ) && ( $self->{host} eq q{} || defined( _STRING( $self->{host} ) ) ) && !utf8::is_utf8( $self->{host} );
      100        
      100        
449             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'port' )
450 224 100       6433 unless _POSINT( $self->{port} );
451             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'timeout (%s)', $self->{timeout} ) )
452 209 50 66     4264 unless !defined( $self->{timeout} ) || ( defined _NUMBER( $self->{timeout} ) && int( 1000 * $self->{timeout} ) >= 1 && int( $self->{timeout} * 1000 ) <= $MAX_INT32 );
      66        
      33        
453             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'broker_list' )
454 199 100       783 unless _ARRAY0( $self->{broker_list} );
455             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'SEND_MAX_ATTEMPTS' )
456 183 100       3820 unless _POSINT( $self->{SEND_MAX_ATTEMPTS} );
457             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'RETRY_BACKOFF' )
458 168 100       4218 unless _POSINT( $self->{RETRY_BACKOFF} );
459             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'MaxLoggedErrors' )
460 153 50       4039 unless defined( _NONNEGINT( $self->{MaxLoggedErrors} ) );
461              
462 153         1548 my $ip_version = $self->{ip_version};
463 153 100 66     439 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'ip_version (%s)', $ip_version ) )
      66        
      66        
464             unless ( !defined( $ip_version ) || ( defined( _NONNEGINT( $ip_version ) ) && ( $ip_version == $IP_V4 || $ip_version == $IP_V6 ) ) );
465              
466 151         378 $self->{_metadata} = {}; # {
467             # TopicName => {
468             # Partition => {
469             # 'Leader' => ...,
470             # 'Replicas' => [
471             # ...,
472             # ],
473             # 'Isr' => [
474             # ...,
475             # ],
476             # },
477             # ...,
478             # },
479             # ...,
480             # }
481 151         336 $self->{_leaders} = {}; # {
482             # NodeId => host:port or [IPv6_host]:port,
483             # ...,
484             # }
485 151         354 $self->{_group_coordinators} = {}; # {
486             # GroupId => host:port or [IPv6_host]:port,
487             # ...,
488             # }
489 151         359 $self->{_nonfatal_errors} = [];
490 151         347 my $IO_cache = $self->{_IO_cache} = {}; # host:port or [IPv6_host]:port => {
491             # 'NodeId' => ...,
492             # 'IO' => ...,
493             # 'timeout' => ...,
494             # 'host' => ...,
495             # 'port' => ...,
496             # 'error' => ...,
497             # },
498             # ...,
499              
500             # init IO cache
501 151 100       878 foreach my $server ( ( $self->{host} ? $self->_build_server_name( $self->{host}, $self->{port} ) : (), @{ $self->{broker_list} } ) ) {
  151         479  
502 169 100       474 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'bad host:port or broker_list element' )
503             unless $self->_is_like_server( $server );
504 150         372 my ( $host, $port ) = _split_host_port( $server );
505 150         420 my $correct_server = $self->_build_server_name( $host, $port );
506 150         868 $IO_cache->{ $correct_server } = {
507             NodeId => undef,
508             IO => undef,
509             host => $host,
510             port => $port,
511             };
512             }
513              
514 132 100       495 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'server is not specified' )
515             unless keys( %$IO_cache );
516              
517 131         2223 return $self;
518             }
519              
520             #-- public attributes ----------------------------------------------------------
521              
522             =head2 METHODS
523              
524             The following methods are defined for the C class:
525              
526             =cut
527              
528             #-- public methods -------------------------------------------------------------
529              
530             =head3 C
531              
532             Returns the list of known Kafka servers (in host:port or [IPv6_host]:port format).
533              
534             =cut
535             sub get_known_servers {
536 145     145 1 3634 my ( $self ) = @_;
537              
538 145         337 return keys %{ $self->{_IO_cache} };
  145         817  
539             }
540              
541             sub _get_api_versions {
542 10124     10124   16161 my ( $self, $server ) = @_;
543              
544 10124         13980 my $server_metadata = $self->{_IO_cache}->{$server};
545 10124 50       17903 defined $server_metadata
546             or die "Fatal error: server '$server' is unknown in IO cache, which should not happen";
547              
548             # if we have cached data, just use it
549             defined $server_metadata->{_api_versions}
550 10124 100       25072 and return $server_metadata->{_api_versions};
551              
552             # no cached data. Initialize empty one
553 96         358 my $server_api_versions = $server_metadata->{_api_versions} = {};
554              
555             # use empty data if client doesn't want to detect API versions
556             $self->{dont_load_supported_api_versions}
557 96 50       673 and return $server_api_versions;
558              
559             # call the server and try to get the supported API versions
560 0         0 my $api_versions = [];
561 0         0 my $error;
562             try {
563             # The ApiVersions API endpoint is only supported on Kafka versions >
564             # 0.10.0.0 so this call may fail. We simply ignore this failure and
565             # carry on.
566 0     0   0 $api_versions = $self->_get_supported_api_versions( $server );
567             }
568             catch {
569 0     0   0 $error = $_;
570 0         0 };
571              
572 0 0       0 if( defined $error ) {
573 0 0 0     0 if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
574 0 0       0 if( $error->code == $ERROR_MISMATCH_ARGUMENT ) {
575             # rethrow known fatal errors
576 0         0 die $error;
577             }
578 0         0 $self->_remember_nonfatal_error( $error->code, $error, $server );
579             } else {
580 0         0 die $error;
581             }
582             }
583              
584 0         0 foreach my $element (@$api_versions) {
585             # we want to choose which api version to use for each API call. We
586             # try to use the max version that the server supports, with
587             # fallback to the max version the protocol implements. If it's
588             # lower than the min version the kafka server supports, we set it
589             # to -1. If thie API endpoint is called, it'll die.
590 0         0 my $kafka_min_version = $element->{MinVersion};
591 0         0 my $kafka_max_version = $element->{MaxVersion};
592 0         0 my $api_key = $element->{ApiKey};
593 0   0     0 my $implemented_max_version = $IMPLEMENTED_APIVERSIONS->{$api_key} // -1;
594 0         0 my $version = $kafka_max_version;
595 0 0       0 $version > $implemented_max_version
596             and $version = $implemented_max_version;
597 0 0       0 $version < $kafka_min_version
598             and $version = -1;
599 0         0 $server_api_versions->{$api_key} = $version;
600             }
601              
602 0         0 return $server_api_versions;
603             }
604              
605             # Returns the list of supported API versions. This is not really. *Warning*,
606             # this call works only against Kafka 1.10.0.0
607              
608             sub _get_supported_api_versions {
609 0     0   0 my ( $self, $broker ) = @_;
610              
611 0         0 my $CorrelationId = _get_CorrelationId();
612 0         0 my $decoded_request = {
613             CorrelationId => $CorrelationId,
614             ClientId => q{},
615             ApiVersion => 0,
616             };
617 0 0       0 say STDERR format_message( '[%s] apiversions request: %s',
618             scalar( localtime ),
619             $decoded_request,
620             ) if $self->debug_level;
621 0         0 my $encoded_request = $protocol{ $APIKEY_APIVERSIONS }->{encode}->( $decoded_request );
622              
623 0         0 my $encoded_response_ref;
624              
625             # receive apiversions. We use a code block because it's actually a loop where
626             # you can do last.
627             {
628 0 0       0 $self->_connectIO( $broker )
  0         0  
629             or last;
630 0 0       0 my $sent = $self->_sendIO( $broker, $encoded_request )
631             or last;
632 0         0 $encoded_response_ref = $self->_receiveIO( $broker );
633             }
634              
635 0 0       0 unless ( $encoded_response_ref ) {
636             # NOTE: it is possible to repeat the operation here
637 0         0 $self->_error( $ERROR_CANNOT_RECV );
638             }
639              
640 0         0 my $decoded_response = $protocol{ $APIKEY_APIVERSIONS }->{decode}->( $encoded_response_ref );
641 0 0       0 say STDERR format_message( '[%s] apiversions response: %s',
642             scalar( localtime ),
643             $decoded_response,
644             ) if $self->debug_level;
645 0 0 0     0 ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
646             # FATAL error
647             or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
648 0         0 my $ErrorCode = $decoded_response->{ErrorCode};
649              
650             # we asked a Kafka < 0.10 ( in this case the call is not
651             # implemented and it dies
652 0 0       0 $ErrorCode == $ERROR_NO_ERROR
653             or $self->_error($ErrorCode);
654              
655 0         0 my $api_versions = $decoded_response->{ApiVersions};
656 0         0 return $api_versions;
657             }
658              
659             =head3 C
660              
661             If C<$topic> is present, it must be a non-false string of non-zero length.
662              
663             If C<$topic> is absent, this method returns metadata for all topics.
664              
665             Updates kafka cluster's metadata description and returns the hash reference to metadata,
666             which can be schematically described as:
667              
668             {
669             TopicName => {
670             Partition => {
671             'Leader' => ...,
672             'Replicas' => [
673             ...,
674             ],
675             'Isr' => [
676             ...,
677             ],
678             },
679             ...,
680             },
681             ...,
682             }
683              
684             Consult Kafka "Wire protocol" documentation for more details about metadata structure.
685              
686             =cut
687             sub get_metadata {
688 1     1 1 1130 my ( $self, $topic ) = @_;
689              
690 1 50 33     19 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      33        
      33        
691             unless !defined( $topic ) || ( ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic ) );
692              
693 1 50       5 $self->_update_metadata( $topic )
694             # FATAL error
695             or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s'", $topic ) );
696              
697 1         3 my $clone;
698 1 50       3 if ( defined $topic ) {
699             $clone = {
700 1         51 $topic => dclone( $self->{_metadata}->{ $topic } )
701             };
702             } else {
703 0         0 $clone = dclone( $self->{_metadata} );
704             }
705              
706 1         4 return $clone;
707             }
708              
709             =head3 C
710              
711             Returns true, if C<$server> (host:port or [IPv6_host]:port) is known in cluster.
712              
713             =cut
714             sub is_server_known {
715 43     43 1 913 my ( $self, $server ) = @_;
716              
717 43 100       91 $self->_error( $ERROR_MISMATCH_ARGUMENT )
718             unless $self->_is_like_server( $server );
719              
720 5         24 return exists $self->{_IO_cache}->{ $server };
721             }
722              
723             # Returns true, if known C<$server> (host:port or [IPv6_host]:port) is accessible.
724             # Checks the accessibility of the server.
725             # This is evil: opens and closes a NEW connection immediately so do not use unless there is a strong reason for it.
726             sub _is_server_alive {
727 22     22   2149 my ( $self, $server ) = @_;
728              
729 22 100       45 $self->_error( $ERROR_MISMATCH_ARGUMENT )
730             unless $self->_is_like_server( $server );
731              
732 3 50       8 $self->_error( $ERROR_NO_KNOWN_BROKERS, 'has not yet received the metadata?' )
733             unless $self->get_known_servers;
734              
735 3         7 my $io_cache = $self->{_IO_cache};
736             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Unknown server '%s' (is not found in the metadata)", $server ) )
737 3 100       10 unless exists( $io_cache->{ $server } );
738              
739 2 50       5 if ( my $io = $self->_connectIO( $server ) ) {
740 2         6 return $io->_is_alive;
741             } else {
742 0         0 return;
743             }
744             }
745              
746             # this is evil, do not use unless there is a very strong reason for it
747             sub _is_server_connected {
748 33     33   1952 my ( $self, $server ) = @_;
749              
750 33 100       81 $self->_error( $ERROR_MISMATCH_ARGUMENT )
751             unless $self->_is_like_server( $server );
752              
753 14         24 my $io_cache = $self->{_IO_cache};
754 14         18 my $io;
755 14 100 66     49 unless ( exists( $io_cache->{ $server } ) && ( $io = $io_cache->{ $server }->{IO} ) ) {
756 8         26 return;
757             }
758              
759 6         18 return $io->_is_alive;
760             }
761              
762             =head3 C
763              
764             =over 3
765              
766             =item C<$request>
767              
768             C<$request> is a reference to the hash representing
769             the structure of the request.
770              
771             This method encodes C<$request>, passes it to the leader of cluster, receives reply, decodes and returns
772             it in a form of hash reference.
773              
774             =back
775              
776             WARNING:
777              
778             =over 3
779              
780             =item *
781              
782             This method should be considered private and should not be called by an end user.
783              
784             =item *
785              
786             In order to achieve better performance, this method does not perform arguments validation.
787              
788             =back
789              
790             =over 3
791              
792             =item C<$compression_codec>
793              
794             Optional.
795              
796             C<$compression_codec> sets the required type of C<$messages> compression,
797             if the compression is desirable.
798              
799             Supported codecs:
800             L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>,
801             L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>,
802             L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>.
803              
804             =back
805              
806             =cut
807             sub receive_response_to_request {
808 10098     10098 1 24795 my ( $self, $request, $compression_codec, $response_timeout ) = @_;
809              
810 10098         15327 my $api_key = $request->{ApiKey};
811              
812 10098   50     30090 my $host_to_send_to = $request->{__send_to__} // 'leader';
813              
814             # WARNING: The current version of the module limited to the following:
815             # supports queries with only one combination of topic + partition (first and only).
816              
817 10098         13806 my $topic_data = $request->{topics}->[0];
818 10098         12507 my $topic_name = $topic_data->{TopicName};
819 10098         14115 my $partition = $topic_data->{partitions}->[0]->{Partition};
820              
821 10098 100 33     11198 if (
      33        
      66        
822 10098         60178 !%{ $self->{_metadata} } # the first request
823             || ( !$self->{AutoCreateTopicsEnable} && defined( $topic_name ) && !exists( $self->{_metadata}->{ $topic_name } ) )
824             ) {
825 52 100       269 $self->_update_metadata( $topic_name ) # hash metadata could be updated
826             # FATAL error
827             or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s'", $topic_name ), request => $request )
828             ;
829             }
830              
831 10095 50       22607 $request->{CorrelationId} = _get_CorrelationId() unless exists $request->{CorrelationId};
832              
833 10095 50       22222 say STDERR format_message( '[%s] compression_codec=%s, request=%s',
834             scalar( localtime ),
835             $compression_codec,
836             $request,
837             ) if $self->debug_level;
838              
839 10095         14669 my( $ErrorCode, $partition_data, $io_error );
840              
841 10095         11126 my $attempt = 0;
842             # we save the original api version of the request, because in the attempt
843             # loop we might be trying different brokers which may support different api
844             # versions.
845 10095         13320 my $original_request_api_version = $request->{ApiVersion};
846 10095   50     25828 ATTEMPT: while ( ++$attempt <= ( $self->{SEND_MAX_ATTEMPTS} // 1 ) ) {
847 10146         13964 $ErrorCode = $ERROR_NO_ERROR;
848 10146         12253 undef $io_error;
849              
850 10146         12409 my $server;
851 10146 50       17082 if ($host_to_send_to eq 'leader') {
    0          
852             # hash metadata could be updated
853 10146         17415 my $leader = $self->{_metadata}->{ $topic_name }->{ $partition }->{Leader};
854 10146 50       17493 next ATTEMPT unless defined $leader;
855              
856 10146         15061 $server = $self->{_leaders}->{ $leader };
857 10146 50       17421 unless ( $server ) {
858 0         0 $ErrorCode = $ERROR_LEADER_NOT_FOUND;
859 0         0 $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
860 0         0 next ATTEMPT;
861             }
862             } elsif ( $host_to_send_to eq 'group_coordinator') {
863 0         0 my $group_id = $request->{GroupId};
864 0 0 0     0 if ( !%{ $self->{_group_coordinators} } && defined $group_id) {
  0         0  
865             # first request
866 0         0 $self->_update_group_coordinators($group_id);
867             }
868 0         0 $server = $self->{_group_coordinators}->{$group_id};
869 0 0       0 unless ( $server ) {
870 0         0 $ErrorCode = $ERROR_GROUP_COORDINATOR_NOT_FOUND;
871 0         0 $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
872 0         0 next ATTEMPT;
873             }
874             } else {
875 0         0 die "__send_to__ must be either 'leader', 'group_coordinator', or void (will default to 'leader')";
876             }
877              
878             # Send a request to the server
879 10146 100       18753 if ( $self->_connectIO( $server ) ) {
880             # we can connect to this server, so let's detect the api versions
881             # it and use whatever it supports, except if the request forces us
882             # to use an api version. Warning, the version might end up being
883             # undef if detection against the Kafka server failed, or if
884             # dont_load_supported_api_versions is true. However the Encoder
885             # code knows how to handle it.
886 10142         16330 $request->{ApiVersion} = $original_request_api_version;
887 10142 100       17742 unless( defined $request->{ApiVersion} ) {
888 10124         21264 $request->{ApiVersion} = $self->_get_api_versions( $server )->{ $api_key };
889             # API versions request may fail and the server may be disconnected
890 10124 50       18152 unless( $self->_is_IO_connected( $server ) ) {
891             # this attempt does not count, assuming that _get_api_versions will not try to get them from failing broker again
892 0         0 redo ATTEMPT;
893             }
894             }
895              
896 10142         29480 my $encoded_request = $protocol{ $api_key }->{encode}->( $request, $compression_codec );
897              
898 10142 100       23580 unless ( $self->_sendIO( $server, $encoded_request ) ) {
899 2         9 $io_error = $self->_io_error( $server );
900 2 50       10 $ErrorCode = $io_error ? $io_error->code : $ERROR_CANNOT_SEND;
901 2         71 $self->_closeIO( $server, 1 );
902             }
903             }
904             else {
905 4         16 $io_error = $self->_io_error( $server );
906 4 50       19 $ErrorCode = $io_error ? $io_error->code : $ERROR_CANNOT_BIND;
907             }
908              
909 10146 100       19635 if ( $ErrorCode != $ERROR_NO_ERROR ) {
910             # could not send request due to non-fatal IO error (fatal errors should be thrown by connectIO/sendIO already)
911 6         24 $self->_remember_nonfatal_error( $ErrorCode, $self->_io_error( $server ), $server, $topic_name, $partition );
912 6 50 33     50 if( $api_key == $APIKEY_PRODUCE && !( $ErrorCode == $ERROR_CANNOT_BIND || $ErrorCode == $ERROR_NO_CONNECTION ) ) {
      66        
913             # do not retry failed produce requests which may have sent some data already
914 0         0 $ErrorCode = $ERROR_CANNOT_SEND;
915 0         0 last ATTEMPT;
916             }
917 6         23 next ATTEMPT;
918             }
919              
920 10140         13567 my $response;
921 10140 100 100     33036 if ( $api_key == $APIKEY_PRODUCE && $request->{RequiredAcks} == $NOT_SEND_ANY_RESPONSE ) {
922             # Do not receive a response, self-forming own response
923             $response = {
924             CorrelationId => $request->{CorrelationId},
925 5009         23603 topics => [
926             {
927             TopicName => $topic_name,
928             partitions => [
929             {
930             Partition => $partition,
931             ErrorCode => 0,
932             Offset => $BAD_OFFSET,
933             },
934             ],
935             },
936             ],
937             };
938             } else {
939 5131         10729 my $encoded_response_ref = $self->_receiveIO( $server, $response_timeout );
940 5131 100       10807 unless ( $encoded_response_ref ) {
941 2 100       7 if ( $api_key == $APIKEY_PRODUCE ) {
942             # WARNING: Unfortunately, the sent package (one or more messages) does not have a unique identifier
943             # and there is no way to verify the delivery of data
944 1         3 $ErrorCode = $ERROR_SEND_NO_ACK;
945              
946             # Should not be allowed to re-send data on the next attempt
947             # FATAL error
948 1         6 $self->_error( $ErrorCode, "no ack for request", io_error => $self->_io_error( $server ), request => $request );
949 0         0 last ATTEMPT;
950             } else {
951 1         4 $ErrorCode = $ERROR_CANNOT_RECV;
952 1         6 $self->_remember_nonfatal_error( $ErrorCode, $self->_io_error( $server ), $server, $topic_name, $partition );
953 1         3 next ATTEMPT;
954             }
955             }
956 5129 100       9553 if ( length( $$encoded_response_ref ) > 4 ) { # MessageSize => int32
957             # we also pass the api version that was used for the request,
958             # so that we know how to decode the response
959 5128         16806 $response = $protocol{ $api_key }->{decode}->( $encoded_response_ref, $request->{ApiVersion} );
960 5128 50       14388 say STDERR format_message( '[%s] response: %s',
961             scalar( localtime ),
962             $response,
963             ) if $self->debug_level;
964             } else {
965 1         6 $self->_error( $ERROR_RESPONSEMESSAGE_NOT_RECEIVED, format_message("response length=%s", length( $$encoded_response_ref ) ), io_error => $self->_io_error( $server ), request => $request );
966             }
967             }
968              
969             # FATAL error if correllation does not match
970             $self->_error( $ERROR_MISMATCH_CORRELATIONID, "$response->{CorrelationId} != $request->{CorrelationId}", request => $request, response => $response )
971             unless $response->{CorrelationId} == $request->{CorrelationId}
972 10137 50       22475 ;
973 10137         14060 $topic_data = $response->{topics}->[0];
974 10137 100       23247 $partition_data = $topic_data->{ $api_key == $APIKEY_OFFSET ? 'PartitionOffsets' : 'partitions' }->[0];
975              
976 10137         13627 $ErrorCode = $partition_data->{ErrorCode};
977              
978 10137 100       35168 return $response if $ErrorCode == $ERROR_NO_ERROR; # success
979              
980 84 50 33     638 if( $api_key == $APIKEY_PRODUCE && $ErrorCode == $ERROR_REQUEST_TIMED_OUT ) {
981             # special case: produce request timed out so we did not get expected ACK and should not retry sending request again
982             # Should not be allowed to re-send data on the next attempt
983             # FATAL error
984 0         0 $self->_error( $ERROR_SEND_NO_ACK, format_message( "topic='%s', partition=%s response error: %s", $topic_name, $partition, $ErrorCode ), request => $request, response => $response );
985 0         0 last ATTEMPT;
986             }
987              
988 84 100       451 if ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) {
989 64         473 $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
990 64         343 next ATTEMPT;
991             }
992              
993             # FATAL error
994 20         107 $self->_error( $ErrorCode, format_message( "topic='%s', partition=%s", $topic_name, $partition ), request => $request );
995             } continue {
996             # Expect to possible changes in the situation, such as restoration of connection
997             say STDERR format_message( '[%s] sleeping for %d ms before making request attempt #%d (%s)',
998             scalar( localtime ),
999             $self->{RETRY_BACKOFF},
1000 71 0       306 $attempt + 1,
    50          
1001             $ErrorCode == $ERROR_NO_ERROR ? 'refreshing metadata' : "ErrorCode ${ErrorCode}",
1002             ) if $self->debug_level;
1003              
1004 71         28228572 Time::HiRes::sleep( $self->{RETRY_BACKOFF} / 1000 );
1005              
1006 71 100 33     1213 $self->_update_metadata( $topic_name )
1007             # FATAL error
1008             or $self->_error( $ErrorCode || $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s', partition=%s", $topic_name, $partition ), request => $request )
1009             ;
1010 68 50       692 if ( $host_to_send_to eq 'group_coordinator') {
1011             $self->_update_group_coordinators($request->{GroupId})
1012 0         0 }
1013             }
1014              
1015             # FATAL error
1016 17 50       105 if ( $ErrorCode ) {
1017 17 100       201 $self->_error( $ErrorCode, format_message( "topic='%s'%s", $topic_data->{TopicName}, $partition_data ? ", partition = ".$partition_data->{Partition} : '' ), request => $request, io_error => $io_error );
1018             } else {
1019 0         0 $self->_error( $ERROR_UNKNOWN_TOPIC_OR_PARTITION, format_message( "topic='%s', partition=%s", $topic_name, $partition ), request => $request, io_error => $io_error );
1020             }
1021              
1022 0         0 return;
1023             }
1024              
1025             =head3 C
1026              
1027             Returns true if the metadata contains information about specified combination of topic and partition.
1028             Otherwise returns false.
1029              
1030             C takes the following arguments:
1031              
1032             =over 3
1033              
1034             =item C<$topic>
1035              
1036             The C<$topic> must be a normal non-false string of non-zero length.
1037              
1038             =item C<$partition>
1039              
1040             =back
1041              
1042             =cut
1043             sub exists_topic_partition {
1044 3     3 1 700 my ( $self, $topic, $partition ) = @_;
1045              
1046 3 50 33     26 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      33        
      33        
1047             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
1048 3 50 33     19 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      33        
1049             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
1050              
1051 3 50       5 unless ( %{ $self->{_metadata} } ) { # the first request
  3         11  
1052 0 0       0 $self->_update_metadata( $topic ) # hash metadata could be updated
1053             # FATAL error
1054             or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s'", $topic ) );
1055             }
1056              
1057 3         11 return exists $self->{_metadata}->{ $topic }->{ $partition };
1058             }
1059              
1060             =head3 C
1061              
1062             Closes connection with C<$server> (defined as host:port or [IPv6_host]:port).
1063              
1064             =cut
1065             sub close_connection {
1066 22     22 1 1906 my ( $self, $server ) = @_;
1067              
1068 22 50       51 unless ( $self->is_server_known( $server ) ) {
1069 0         0 return;
1070             }
1071              
1072 3         9 $self->_closeIO( $server );
1073 3         9 return 1;
1074             }
1075              
1076             =head3 C
1077              
1078             Closes connection with all known Kafka servers.
1079              
1080             =cut
1081             sub close {
1082 7     7 1 57273 my ( $self ) = @_;
1083              
1084 7         30 foreach my $server ( $self->get_known_servers ) {
1085 19         104 $self->_closeIO( $server );
1086             }
1087              
1088 7         66 return;
1089             }
1090              
1091             =head3 C
1092              
1093             Returns a reference to a hash.
1094              
1095             Each hash key is the identifier of the server (host:port or [IPv6_host]:port), and the value is the last communication error
1096             with that server.
1097              
1098             An empty hash is returned if there were no communication errors.
1099              
1100             =cut
1101             sub cluster_errors {
1102 2     2 1 1810 my ( $self ) = @_;
1103              
1104 2         3 my %errors;
1105 2         9 foreach my $server ( $self->get_known_servers ) {
1106 4 100       12 if ( my $error = $self->_io_error( $server ) ) {
1107 3         23 $errors{ $server } = $error;
1108             }
1109             }
1110              
1111 2         8 return \%errors;
1112             }
1113              
1114             =head3 C
1115              
1116             Returns a reference to an array of the last non-fatal errors.
1117              
1118             Maximum number of entries is set using C parameter of L.
1119              
1120             A reference to the empty array is returned if there were no non-fatal errors or parameter C
1121             is set to 0.
1122              
1123             =cut
1124             sub nonfatal_errors {
1125 81     81 1 131283 my ( $self ) = @_;
1126              
1127 81         528 return $self->{_nonfatal_errors};
1128             }
1129              
1130             =head3 C
1131              
1132             Clears an array of the last non-fatal errors.
1133              
1134             A reference to the empty array is returned because there are no non-fatal errors now.
1135              
1136             =cut
1137             sub clear_nonfatals {
1138 0     0 1 0 my ( $self ) = @_;
1139              
1140 0         0 @{ $self->{_nonfatal_errors} } = ();
  0         0  
1141              
1142 0         0 return $self->{_nonfatal_errors};
1143             }
1144              
1145             #-- private attributes ---------------------------------------------------------
1146              
1147             #-- private functions ----------------------------------------------------------
1148              
1149             sub _split_host_port {
1150 372     372   698 my ( $server ) = @_;
1151              
1152 372         1960 my ( $host, $port ) = $server=~ /^(.+):(\d+)$/;
1153 372 50 66     1507 $host = $1 if $host && $host =~ /^\[(.+)\]$/;
1154              
1155 372         1248 return( $host, $port );
1156             }
1157              
1158             #-- private methods ------------------------------------------------------------
1159              
1160             # Remember non-fatal error
1161             sub _remember_nonfatal_error {
1162 71     71   403 my ( $self, $error_code, $error, $server, $topic, $partition ) = @_;
1163              
1164             my $max_logged_errors = $self->{MaxLoggedErrors}
1165 71 50       360 or return;
1166              
1167 0         0 shift( @{ $self->{_nonfatal_errors} } )
1168 71 50       149 if scalar( @{ $self->{_nonfatal_errors} } ) == $max_logged_errors;
  71         342  
1169             my $msg = format_message( "[%s] Non-fatal error: %s (ErrorCode %s, server '%s', topic '%s', partition %s)",
1170             scalar( localtime ),
1171 71 0 0     5886 $error // ( defined( $error_code ) && exists( $ERROR{ $error_code } ) ? $ERROR{ $error_code } : '' ),
      33        
      50        
1172             $error_code // 'IO error',
1173             $server,
1174             $topic,
1175             $partition,
1176             );
1177              
1178 71 50       391 say STDERR $msg
1179             if $self->debug_level;
1180              
1181 71         199 push @{ $self->{_nonfatal_errors} }, $msg;
  71         307  
1182              
1183 71         217 return $msg;
1184             }
1185              
1186             # Returns identifier of the cluster leader (host:port or [IPv6_host]:port)
1187             sub _find_leader_server {
1188 118     118   414 my ( $self, $node_id ) = @_;
1189              
1190 118         230 my $leader_server;
1191 118         295 my $IO_cache = $self->{_IO_cache};
1192 118         283 my $NodeId;
1193 118         476 foreach my $server ( keys %$IO_cache ) {
1194 250         605 $NodeId = $IO_cache->{ $server }->{NodeId};
1195 250 100 66     1588 if ( defined( $NodeId ) && $NodeId == $node_id ) {
1196 118         310 $leader_server = $server;
1197 118         332 last;
1198             }
1199             }
1200              
1201 118         713 return $leader_server;
1202             }
1203              
1204             # Form a list of servers to attempt querying of the metadata
1205             sub _get_interviewed_servers {
1206 124     124   424 my ( $self ) = @_;
1207              
1208 124         401 my ( @priority, @secondary, @rest );
1209 124         415 my $IO_cache = $self->{_IO_cache};
1210 124         235 my $server_data;
1211 124         781 foreach my $server ( $self->get_known_servers ) {
1212 266         806 $server_data = $IO_cache->{ $server };
1213 266 100       1179 if ( defined $server_data->{NodeId} ) {
1214 213 100       706 if ( $server_data->{IO} ) {
1215 135         520 push @priority, $server;
1216             } else {
1217 78         236 push @secondary, $server;
1218             }
1219             } else {
1220 53         173 push @rest, $server;
1221             }
1222             }
1223              
1224 124         1172 return( shuffle( @priority ), shuffle( @secondary ), shuffle( @rest ) );
1225             }
1226              
1227             # Refresh group_coordinators for given topic
1228             sub _update_group_coordinators {
1229 0     0   0 my ($self, $group_id) = @_;
1230              
1231 0         0 my $CorrelationId = _get_CorrelationId();
1232 0         0 my $decoded_request = {
1233             CorrelationId => $CorrelationId,
1234             ClientId => q{},
1235             CoordinatorKey => $group_id,
1236             CoordinatorType => 0, # type is group
1237             };
1238 0 0       0 say STDERR format_message( '[%s] group coordinators request: %s',
1239             scalar( localtime ),
1240             $decoded_request,
1241             ) if $self->debug_level;
1242 0         0 my $encoded_request = $protocol{ $APIKEY_FINDCOORDINATOR }->{encode}->( $decoded_request );
1243              
1244 0         0 my $encoded_response_ref;
1245 0         0 my @brokers = $self->_get_interviewed_servers;
1246              
1247             # receive coordinator data
1248 0         0 foreach my $broker ( @brokers ) {
1249 0 0 0     0 last if $self->_connectIO( $broker )
      0        
1250             && $self->_sendIO( $broker, $encoded_request )
1251             && ( $encoded_response_ref = $self->_receiveIO( $broker ) );
1252             }
1253              
1254 0 0       0 unless ( $encoded_response_ref ) {
1255             # NOTE: it is possible to repeat the operation here
1256 0         0 return;
1257             }
1258              
1259 0         0 my $decoded_response = $protocol{ $APIKEY_FINDCOORDINATOR }->{decode}->( $encoded_response_ref );
1260 0 0       0 say STDERR format_message( '[%s] group coordinators: %s',
1261             scalar( localtime ),
1262             $decoded_response,
1263             ) if $self->debug_level;
1264 0 0 0     0 ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
1265             # FATAL error
1266             or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
1267             $decoded_response->{ErrorCode}
1268 0 0       0 and $self->_error( $decoded_response->{ErrorCode} );
1269              
1270 0         0 my $IO_cache = $self->{_IO_cache};
1271 0         0 my $server = $self->_build_server_name( @{ $decoded_response }{ 'Host', 'Port' } );
  0         0  
1272             $IO_cache->{ $server } = { # can add new servers
1273             IO => $IO_cache->{ $server }->{IO}, # IO or undef
1274             NodeId => $decoded_response->{NodeId},
1275             host => $decoded_response->{Host},
1276             port => $decoded_response->{Port},
1277 0         0 };
1278 0         0 $self->{_group_coordinators}->{ $group_id } = $server;
1279              
1280 0         0 return 1;
1281             }
1282              
1283             # Refresh metadata for given topic
1284             sub _update_metadata {
1285 124     124   653 my ( $self, $topic, $is_recursive_call ) = @_;
1286              
1287 124         1052 my $CorrelationId = _get_CorrelationId();
1288 124   33     1537 my $decoded_request = {
1289             CorrelationId => $CorrelationId,
1290             ClientId => q{},
1291             topics => [
1292             $topic // (),
1293             ],
1294             };
1295 124 50       833 say STDERR format_message( '[%s] metadata request: %s',
1296             scalar( localtime ),
1297             $decoded_request,
1298             ) if $self->debug_level;
1299 124         1415 my $encoded_request = $protocol{ $APIKEY_METADATA }->{encode}->( $decoded_request );
1300              
1301 124         370 my $encoded_response_ref;
1302 124         616 my @brokers = $self->_get_interviewed_servers;
1303              
1304             # receive metadata
1305 124         407 foreach my $broker ( @brokers ) {
1306 130 100 66     936 last if $self->_connectIO( $broker )
      100        
1307             && $self->_sendIO( $broker, $encoded_request )
1308             && ( $encoded_response_ref = $self->_receiveIO( $broker ) );
1309             }
1310              
1311 123 100       485 unless ( $encoded_response_ref ) {
1312             # NOTE: it is possible to repeat the operation here
1313 5         49 return;
1314             }
1315              
1316 118         791 my $decoded_response = $protocol{ $APIKEY_METADATA }->{decode}->( $encoded_response_ref );
1317 118 50       563 say STDERR format_message( '[%s] metadata response: %s',
1318             scalar( localtime ),
1319             $decoded_response,
1320             ) if $self->debug_level;
1321 118 50 33     868 ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
1322             # FATAL error
1323             or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
1324              
1325 118 50       646 unless ( _ARRAY( $decoded_response->{Broker} ) ) {
1326 0 0       0 if ( $self->{AutoCreateTopicsEnable} ) {
1327 0         0 return $self->_attempt_update_metadata( $is_recursive_call, $topic, undef, $ERROR_NO_KNOWN_BROKERS );
1328             } else {
1329             # FATAL error
1330 0         0 $self->_error( $ERROR_NO_KNOWN_BROKERS, format_message( "topic='%s'", $topic ) );
1331             }
1332             }
1333              
1334 118         383 my $IO_cache = $self->{_IO_cache};
1335              
1336             # Clear the previous information about the NodeId in the IO cache
1337 118         696 $IO_cache->{ $_ }->{NodeId} = undef for @brokers;
1338              
1339             # In the IO cache update/add obtained server information
1340 118         301 foreach my $received_broker ( @{ $decoded_response->{Broker} } ) {
  118         419  
1341 354         750 my $server = $self->_build_server_name( @{ $received_broker }{ 'Host', 'Port' } );
  354         1116  
1342             $IO_cache->{ $server } = { # can add new servers
1343             IO => $IO_cache->{ $server }->{IO}, # IO or undef
1344             NodeId => $received_broker->{NodeId},
1345             host => $received_broker->{Host},
1346             port => $received_broker->{Port},
1347 354         8928 };
1348             }
1349              
1350             #NOTE: IO cache does not remove server that's missing in metadata
1351              
1352             # Collect the received metadata
1353 118         356 my $received_metadata = {};
1354 118         350 my $leaders = {};
1355              
1356 118         346 my $ErrorCode = $ERROR_NO_ERROR;
1357 118         322 my( $TopicName, $partition );
1358             METADATA_CREATION:
1359 118         256 foreach my $topic_metadata ( @{ $decoded_response->{TopicMetadata} } ) {
  118         412  
1360 118         302 $TopicName = $topic_metadata->{TopicName};
1361 118         291 undef $partition;
1362             last METADATA_CREATION
1363 118 50       493 if ( $ErrorCode = $topic_metadata->{ErrorCode} ) != $ERROR_NO_ERROR;
1364              
1365 118         269 foreach my $partition_metadata ( @{ $topic_metadata->{PartitionMetadata} } ) {
  118         358  
1366 118         307 $partition = $partition_metadata->{Partition};
1367             last METADATA_CREATION
1368 118 50 33     512 if ( $ErrorCode = $partition_metadata->{ErrorCode} ) != $ERROR_NO_ERROR
1369             && $ErrorCode != $ERROR_REPLICA_NOT_AVAILABLE;
1370 118         486 $ErrorCode = $ERROR_NO_ERROR;
1371              
1372 118         967 my $received_partition_data = $received_metadata->{ $TopicName }->{ $partition } = {};
1373 118         424 my $leader = $received_partition_data->{Leader} = $partition_metadata->{Leader};
1374 118         430 $received_partition_data->{Replicas} = [ @{ $partition_metadata->{Replicas} } ];
  118         561  
1375 118         406 $received_partition_data->{Isr} = [ @{ $partition_metadata->{Isr} } ];
  118         452  
1376              
1377 118         589 $leaders->{ $leader } = $self->_find_leader_server( $leader );
1378             }
1379             }
1380 118 50       452 if ( $ErrorCode != $ERROR_NO_ERROR ) {
1381 0 0       0 if ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) {
1382 0         0 return $self->_attempt_update_metadata( $is_recursive_call, $TopicName, $partition, $ErrorCode );
1383             } else {
1384             # FATAL error
1385 0 0       0 $self->_error( $ErrorCode, format_message( "topic='%s'%s", $TopicName, defined( $partition ) ? ", partition=$partition" : '' ) );
1386             }
1387             }
1388              
1389             # Update metadata for received topics
1390 118         357 $self->{_metadata}->{ $_ } = $received_metadata->{ $_ } foreach keys %{ $received_metadata };
  118         952  
1391 118         354 $self->{_leaders}->{ $_ } = $leaders->{ $_ } foreach keys %{ $leaders };
  118         729  
1392              
1393 118         1483 return 1;
1394             }
1395              
1396             # trying to get the metadata without error
1397             sub _attempt_update_metadata {
1398 0     0   0 my ( $self, $is_recursive_call, $topic, $partition, $error_code ) = @_;
1399              
1400 0 0       0 return if $is_recursive_call;
1401 0         0 $self->_remember_nonfatal_error( $error_code, $ERROR{ $error_code }, undef, $topic, $partition );
1402              
1403 0         0 my $attempts = $self->{SEND_MAX_ATTEMPTS};
1404             ATTEMPTS:
1405 0         0 while ( $attempts-- ) {
1406             say STDERR format_message( '[%s] sleeping for %d ms before making update metadata attempt #%d',
1407             scalar( localtime ),
1408             $self->{RETRY_BACKOFF},
1409 0 0       0 $self->{SEND_MAX_ATTEMPTS} - $attempts + 1,
1410             ) if $self->debug_level;
1411 0         0 Time::HiRes::sleep( $self->{RETRY_BACKOFF} / 1000 );
1412 0 0       0 return( 1 ) if $self->_update_metadata( $topic, 1 );
1413             }
1414             # FATAL error
1415 0 0       0 $self->_error( $error_code, format_message( "topic='%s'%s", $topic, defined( $partition ) ? ", partition=$partition" : '' ) );
1416              
1417 0         0 return;
1418             }
1419              
1420             # forms server identifier using supplied $host, $port
1421             sub _build_server_name {
1422 654     654   1646 my ( $self, $host, $port ) = @_;
1423              
1424 654 50       2126 $host = "[$host]" if is_ipv6( $host );
1425              
1426 654         13511 return "$host:$port";
1427             }
1428              
1429             # remembers error communicating with the server
1430             sub _on_io_error {
1431 20     20   56 my ( $self, $server_data, $error ) = @_;
1432 20         75 $server_data->{error} = $error;
1433 20 100       71 if( $server_data->{IO} ) {
1434 16         139 $server_data->{IO}->close;
1435 16         207 $server_data->{IO} = undef;
1436             }
1437              
1438 20 50 33     196 if( blessed( $error ) && $error->isa('Kafka::Exception') ) {
1439 20 100 66     558 if( $error->code == $ERROR_MISMATCH_ARGUMENT || $error->code == $ERROR_INCOMPATIBLE_HOST_IP_VERSION ) {
1440             # rethrow known fatal errors
1441 1         18 die $error;
1442             }
1443             } else {
1444             # rethrow all unknown errors
1445 0         0 die $error;
1446             }
1447              
1448 19         640 return;
1449             }
1450              
1451             sub _io_error {
1452 19     19   53 my( $self, $server ) = @_;
1453 19         39 my $error;
1454 19 50       64 if( my $server_data = $self->{_IO_cache}->{ $server } ) {
1455 19         38 $error = $server_data->{error};
1456             }
1457 19         83 return $error;
1458             }
1459              
1460             sub _is_IO_connected {
1461 10124     10124   16461 my ( $self, $server ) = @_;
1462 10124 50       19304 my $server_data = $self->{_IO_cache}->{ $server } or return;
1463 10124         21131 return $server_data->{IO};
1464             }
1465              
1466             # connects to a server (host:port or [IPv6_host]:port)
1467             sub _connectIO {
1468 10278     10278   17258 my ( $self, $server ) = @_;
1469              
1470 10278 50       22239 my $server_data = $self->{_IO_cache}->{ $server }
1471             or $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Unknown server '%s' (is not found in the metadata)", $server ) )
1472             ;
1473 10278 100       18222 unless( $server_data->{IO} ) {
1474 114         201 my $error;
1475             try {
1476             $server_data->{IO} = Kafka::IO->new(
1477             host => $server_data->{host},
1478             port => $server_data->{port},
1479             timeout => $self->{timeout},
1480             ip_version => $self->{ip_version},
1481 114     114   21139 );
1482 110         10310 $server_data->{error} = undef;
1483             } catch {
1484 4     4   4934 $error = $_;
1485 114         1187 };
1486              
1487 114 100       2785 if( defined $error ) {
1488 4         19 $self->_on_io_error( $server_data, $error );
1489 4         16 return;
1490             }
1491             }
1492              
1493 10274         21551 return $server_data->{IO};
1494             }
1495              
1496             sub _server_data_IO {
1497 15524     15524   24006 my ( $self, $server ) = @_;
1498 15524 50       39718 my $server_data = $self->{_IO_cache}->{ $server }
1499             or $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Unknown server '%s' (is not found in the metadata)", $server ) )
1500             ;
1501             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Server '%s' is not connected", $server ) )
1502             unless $server_data->{IO}
1503 15524 50       27824 ;
1504 15524         33320 return ( $server_data, $server_data->{IO} );
1505             }
1506              
1507             # Send encoded request ($encoded_request) to server ($server)
1508             sub _sendIO {
1509 10272     10272   20989 my ( $self, $server, $encoded_request ) = @_;
1510 10272         19851 my( $server_data, $io ) = $self->_server_data_IO( $server );
1511 10272         14545 my $sent;
1512             my $error;
1513             try {
1514 10272     10272   449154 $sent = $io->send( $encoded_request );
1515             } catch {
1516 11     11   13900 $error = $_;
1517 10272         56402 };
1518              
1519 10272 100       315388 if( defined $error ) {
1520 11         47 $self->_on_io_error( $server_data, $error );
1521             }
1522              
1523 10271         30974 return $sent;
1524             }
1525              
1526             # Receive response from a given server
1527             sub _receiveIO {
1528 5252     5252   10786 my ( $self, $server, $response_timeout ) = @_;
1529 5252         10172 my( $server_data, $io ) = $self->_server_data_IO( $server );
1530 5252         7835 my $response_ref;
1531             my $error;
1532             try {
1533 5252     5252   228633 $response_ref = $io->receive( 4, $response_timeout ); # response header must arrive within request-specific timeout if provided
1534 5247 50 33     29213 if ( $response_ref && length( $$response_ref ) == 4 ) {
1535             # received 4-byte response header with response size; try receiving the rest
1536 5247         16972 my $message_body_ref = $io->receive( unpack( 'l>', $$response_ref ) );
1537 5247         29246 $$response_ref .= $$message_body_ref;
1538             }
1539             } catch {
1540 5     5   6767 $error = $_;
1541 5252         30973 };
1542              
1543 5252 100       76259 if( defined $error ) {
1544 5         24 $self->_on_io_error( $server_data, $error );
1545             }
1546              
1547 5252         11182 return $response_ref;
1548             }
1549              
1550             # Close connectino to $server
1551             sub _closeIO {
1552 24     24   59 my ( $self, $server, $keep_error ) = @_;
1553              
1554 24 50       72 if ( my $server_data = $self->{_IO_cache}->{ $server } ) {
1555 24 100       69 if ( my $io = $server_data->{IO} ) {
1556 13         53 $io->close;
1557 13 50       228 $server_data->{error} = undef unless $keep_error;
1558 13         56 $server_data->{IO} = undef;
1559             }
1560             }
1561              
1562 24         54 return;
1563             }
1564              
1565             # check validity of an argument to match host:port format
1566             sub _is_like_server {
1567 267     267   527 my ( $self, $server ) = @_;
1568              
1569 267 100 100     1879 unless (
      100        
1570             defined( $server )
1571             && defined( _STRING( $server ) )
1572             && !utf8::is_utf8( $server )
1573             ) {
1574 45         169 return;
1575             }
1576              
1577 222         544 my ( $host, $port ) = _split_host_port( $server );
1578 222 100 66     769 unless ( ( is_hostname( $host ) || is_ipv4( $host ) || is_ipv6( $host ) ) && $port ) {
      66        
1579 50         1806 return;
1580             }
1581              
1582 172         11837 return $server;
1583             }
1584              
1585             # Handler for errors
1586             sub _error {
1587 226     226   739 my $self = shift;
1588 226         804 Kafka::Exception::Connection->throw( throw_args( @_ ) );
1589             }
1590              
1591             1;
1592              
1593             __END__