File Coverage

lib/Kafka/Connection.pm
Criterion Covered Total %
statement 363 472 76.9
branch 139 250 55.6
condition 74 155 47.7
subroutine 54 60 90.0
pod 11 11 100.0
total 641 948 67.6


line stmt bran cond sub pod time code
1             package Kafka::Connection;
2              
3             =head1 NAME
4              
5             Kafka::Connection - Object interface to connect to a kafka cluster.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.07 .
10              
11             =cut
12              
13 8     8   90878 use 5.010;
  8         23  
14 8     8   38 use strict;
  8         10  
  8         137  
15 8     8   27 use warnings;
  8         10  
  8         364  
16              
17             our $DEBUG = 0;
18              
19             our $VERSION = '1.07';
20              
21 8         306 use Exporter qw(
22             import
23 8     8   34 );
  8         11  
24             our @EXPORT = qw(
25             %RETRY_ON_ERRORS
26             );
27              
28 8         362 use Data::Validate::Domain qw(
29             is_hostname
30 8     8   1083 );
  8         38291  
31 8         521 use Data::Validate::IP qw(
32             is_ipv4
33             is_ipv6
34 8     8   1378 );
  8         109072  
35 8     8   294 use Const::Fast;
  8         1724  
  8         58  
36 8         425 use List::Util qw(
37             shuffle
38 8     8   442 );
  8         12  
39 8         533 use Params::Util qw(
40             _ARRAY
41             _ARRAY0
42             _HASH
43             _NONNEGINT
44             _NUMBER
45             _POSINT
46             _STRING
47 8     8   1217 );
  8         5905  
48 8         305 use Scalar::Util qw(
49             blessed
50 8     8   50 );
  8         17  
51 8         411 use Scalar::Util::Numeric qw(
52             isint
53 8     8   2035 );
  8         3578  
54 8         316 use Storable qw(
55             dclone
56 8     8   51 );
  8         17  
57 8     8   873 use Time::HiRes ();
  8         2736  
  8         147  
58 8     8   1136 use Try::Tiny;
  8         6302  
  8         640  
59              
60 8         2303 use Kafka qw(
61             %ERROR
62              
63             $ERROR_NO_ERROR
64             $ERROR_UNKNOWN
65             $ERROR_OFFSET_OUT_OF_RANGE
66             $ERROR_INVALID_MESSAGE
67             $ERROR_UNKNOWN_TOPIC_OR_PARTITION
68             $ERROR_INVALID_FETCH_SIZE
69             $ERROR_LEADER_NOT_AVAILABLE
70             $ERROR_NOT_LEADER_FOR_PARTITION
71             $ERROR_REQUEST_TIMED_OUT
72             $ERROR_BROKER_NOT_AVAILABLE
73             $ERROR_REPLICA_NOT_AVAILABLE
74             $ERROR_MESSAGE_TOO_LARGE
75             $ERROR_STALE_CONTROLLER_EPOCH
76             $ERROR_NETWORK_EXCEPTION
77             $ERROR_GROUP_LOAD_IN_PROGRESS
78             $ERROR_OFFSET_METADATA_TOO_LARGE
79             $ERROR_GROUP_COORDINATOR_NOT_AVAILABLE
80             $ERROR_NOT_COORDINATOR_FOR_GROUP
81             $ERROR_NOT_ENOUGH_REPLICAS
82             $ERROR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
83             $ERROR_REBALANCE_IN_PROGRESS
84             $ERROR_UNSUPPORTED_VERSION
85              
86             $ERROR_CANNOT_BIND
87             $ERROR_CANNOT_GET_METADATA
88             $ERROR_CANNOT_RECV
89             $ERROR_CANNOT_SEND
90             $ERROR_LEADER_NOT_FOUND
91             $ERROR_GROUP_COORDINATOR_NOT_FOUND
92             $ERROR_MISMATCH_ARGUMENT
93             $ERROR_MISMATCH_CORRELATIONID
94             $ERROR_NO_KNOWN_BROKERS
95             $ERROR_RESPONSEMESSAGE_NOT_RECEIVED
96             $ERROR_SEND_NO_ACK
97             $ERROR_UNKNOWN_APIKEY
98             $ERROR_INCOMPATIBLE_HOST_IP_VERSION
99             $ERROR_NO_CONNECTION
100              
101             $IP_V4
102             $IP_V6
103             $KAFKA_SERVER_PORT
104             $NOT_SEND_ANY_RESPONSE
105             $REQUEST_TIMEOUT
106             $RETRY_BACKOFF
107             $SEND_MAX_ATTEMPTS
108 8     8   344 );
  8         17  
109              
110 8     8   1089 use Kafka::Exceptions;
  8         16  
  8         374  
111 8         938 use Kafka::Internals qw(
112             $APIKEY_FETCH
113             $APIKEY_METADATA
114             $APIKEY_OFFSET
115             $APIKEY_PRODUCE
116             $APIKEY_FINDCOORDINATOR
117             $APIKEY_APIVERSIONS
118             $APIKEY_OFFSETCOMMIT
119             $APIKEY_OFFSETFETCH
120             $MAX_CORRELATIONID
121             $MAX_INT32
122             debug_level
123             _get_CorrelationId
124             format_message
125 8     8   43 );
  8         10  
126 8     8   1247 use Kafka::IO;
  8         14  
  8         314  
127 8         36055 use Kafka::Protocol qw(
128             $BAD_OFFSET
129             $IMPLEMENTED_APIVERSIONS
130             decode_fetch_response
131             decode_metadata_response
132             decode_offset_response
133             decode_produce_response
134             decode_api_versions_response
135             decode_find_coordinator_response
136             decode_offsetcommit_response
137             decode_offsetfetch_response
138             encode_fetch_request
139             encode_metadata_request
140             encode_offset_request
141             encode_produce_request
142             encode_api_versions_request
143             encode_find_coordinator_request
144             encode_offsetcommit_request
145             encode_offsetfetch_request
146 8     8   3435 );
  8         16  
147              
148             =head1 SYNOPSIS
149              
150             use 5.010;
151             use strict;
152             use warnings;
153              
154             use Scalar::Util qw(
155             blessed
156             );
157             use Try::Tiny;
158              
159             # A simple example of Kafka::Connection usage:
160             use Kafka::Connection;
161              
162             # connect to local cluster with the defaults
163             my $connection;
164             try {
165             $connection = Kafka::Connection->new( host => 'localhost' );
166             } catch {
167             my $error = $_;
168             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
169             warn $error->message, "\n", $error->trace->as_string, "\n";
170             exit;
171             } else {
172             die $error;
173             }
174             };
175              
176             # Closes the connection and cleans up
177             $connection->close;
178             undef $connection;
179              
180             =head1 DESCRIPTION
181              
182             The main features of the C class are:
183              
184             =over 3
185              
186             =item *
187              
188             Provides API for communication with Kafka 0.9+ cluster.
189              
190             =item *
191              
192             Performs requests encoding and responses decoding, provides automatic
193             selection or promotion of a leader server from Kafka cluster.
194              
195             =item *
196              
197             Provides information about Kafka cluster.
198              
199             =back
200              
201             =cut
202              
203             my %protocol = (
204             "$APIKEY_PRODUCE" => {
205             decode => \&decode_produce_response,
206             encode => \&encode_produce_request,
207             },
208             "$APIKEY_FETCH" => {
209             decode => \&decode_fetch_response,
210             encode => \&encode_fetch_request,
211             },
212             "$APIKEY_OFFSET" => {
213             decode => \&decode_offset_response,
214             encode => \&encode_offset_request,
215             },
216             "$APIKEY_METADATA" => {
217             decode => \&decode_metadata_response,
218             encode => \&encode_metadata_request,
219             },
220             "$APIKEY_APIVERSIONS" => {
221             decode => \&decode_api_versions_response,
222             encode => \&encode_api_versions_request,
223             },
224             "$APIKEY_FINDCOORDINATOR" => {
225             decode => \&decode_find_coordinator_response,
226             encode => \&encode_find_coordinator_request,
227             },
228             "$APIKEY_OFFSETCOMMIT" => {
229             decode => \&decode_offsetcommit_response,
230             encode => \&encode_offsetcommit_request,
231             },
232             "$APIKEY_OFFSETFETCH" => {
233             decode => \&decode_offsetfetch_response,
234             encode => \&encode_offsetfetch_request,
235             },
236             );
237              
238             =head2 EXPORT
239              
240             The following constants are available for export
241              
242             =cut
243              
244             =head3 C<%RETRY_ON_ERRORS>
245              
246             These are non-fatal errors, which when happen causes refreshing of meta-data from Kafka followed by
247             another attempt to fetch data.
248              
249             =cut
250             # When any of the following error happens, a possible change in meta-data on server is expected.
251             const our %RETRY_ON_ERRORS => (
252             # $ERROR_NO_ERROR => 1, # 0 - No error
253             $ERROR_UNKNOWN => 1, # -1 - An unexpected server error
254             # $ERROR_OFFSET_OUT_OF_RANGE => 1, # 1 - The requested offset is not within the range of offsets maintained by the server
255             $ERROR_INVALID_MESSAGE => 1, # 2 - Retriable - This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt
256             $ERROR_UNKNOWN_TOPIC_OR_PARTITION => 1, # 3 - Retriable - This server does not host this topic-partition
257             # $ERROR_INVALID_FETCH_SIZE => 1, # 4 - The requested fetch size is invalid
258             $ERROR_LEADER_NOT_AVAILABLE => 1, # 5 - Retriable - Unable to write due to ongoing Kafka leader selection
259             $ERROR_NOT_LEADER_FOR_PARTITION => 1, # 6 - Retriable - Server is not a leader for partition
260             $ERROR_REQUEST_TIMED_OUT => 1, # 7 - Retriable - Request time-out
261             $ERROR_BROKER_NOT_AVAILABLE => 1, # 8 - Broker is not available
262             $ERROR_REPLICA_NOT_AVAILABLE => 1, # 9 - Replica not available
263             # $ERROR_MESSAGE_TOO_LARGE => 1, # 10 - The request included a message larger than the max message size the server will accept
264             $ERROR_STALE_CONTROLLER_EPOCH => 1, # 11 - The controller moved to another broker
265             # $ERROR_OFFSET_METADATA_TOO_LARGE => 1, # 12 - The metadata field of the offset request was too large
266             $ERROR_NETWORK_EXCEPTION => 1, # 13 Retriable - The server disconnected before a response was received
267             $ERROR_GROUP_LOAD_IN_PROGRESS => 1, # 14 - Retriable - The coordinator is loading and hence can't process requests for this group
268             $ERROR_GROUP_COORDINATOR_NOT_AVAILABLE => 1, # 15 - Retriable - The group coordinator is not available
269             $ERROR_NOT_COORDINATOR_FOR_GROUP => 1, # 16 - Retriable - This is not the correct coordinator for this group
270              
271             # $ERROR_INVALID_TOPIC_EXCEPTION => 1, # 17 - The request attempted to perform an operation on an invalid topic
272             # $ERROR_RECORD_LIST_TOO_LARGE => 1, # 18 - The request included message batch larger than the configured segment size on the server
273             $ERROR_NOT_ENOUGH_REPLICAS => 1, # 19 - Retriable - Messages are rejected since there are fewer in-sync replicas than required
274             $ERROR_NOT_ENOUGH_REPLICAS_AFTER_APPEND => 1, # 20 - Retriable - Messages are written to the log, but to fewer in-sync replicas than required
275             # $ERROR_INVALID_REQUIRED_ACKS => 1, # 21 - Produce request specified an invalid value for required acks
276             # $ERROR_ILLEGAL_GENERATION => 1, # 22 - Specified group generation id is not valid
277             # $ERROR_INCONSISTENT_GROUP_PROTOCOL => 1, # 23 - The group member's supported protocols are incompatible with those of existing members
278             # $ERROR_INVALID_GROUP_ID => 1, # 24 - The configured groupId is invalid
279             # $ERROR_UNKNOWN_MEMBER_ID => 1, # 25 - The coordinator is not aware of this member
280             # $ERROR_INVALID_SESSION_TIMEOUT => 1, # 26 - The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)
281             $ERROR_REBALANCE_IN_PROGRESS => 1, # 27 - The group is rebalancing, so a rejoin is needed
282             # $ERROR_INVALID_COMMIT_OFFSET_SIZE => 1, # 28 - The committing offset data size is not valid
283             # $ERROR_TOPIC_AUTHORIZATION_FAILED => 1, # 29 - Not authorized to access topics: Topic authorization failed
284             # $ERROR_GROUP_AUTHORIZATION_FAILED => 1, # 30 - Not authorized to access group: Group authorization failed
285             # $ERROR_CLUSTER_AUTHORIZATION_FAILED => 1, # 31 - Cluster authorization failed
286             # $ERROR_INVALID_TIMESTAMP => 1, # 32 - The timestamp of the message is out of acceptable range
287             # $ERROR_UNSUPPORTED_SASL_MECHANISM => 1, # 33 - The broker does not support the requested SASL mechanism
288             # $ERROR_ILLEGAL_SASL_STATE => 1, # 34 - Request is not valid given the current SASL state
289             # $ERROR_UNSUPPORTED_VERSION => 1, # 35 - The version of API is not supported
290             $ERROR_NO_CONNECTION => 1, # may be disconnected due to idle timeout etc.
291             );
292              
293             #-- constructor ----------------------------------------------------------------
294              
295             =head2 CONSTRUCTOR
296              
297             =head3 C
298              
299             Creates C object for interaction with Kafka cluster.
300             Returns created C object.
301              
302             C takes arguments in key-value pairs. The following arguments are currently recognized:
303              
304             =over 3
305              
306             =item C $host>
307              
308             C<$host> is any Apache Kafka cluster host to connect to. It can be a hostname or the
309             IP-address in the "xx.xx.xx.xx" form.
310              
311             Optional. Either C or C must be supplied.
312              
313             WARNING:
314              
315             Make sure that you always connect to brokers using EXACTLY the same address or host name
316             as specified in broker configuration (host.name in server.properties).
317             Avoid using default value (when host.name is commented out) in server.properties - always use explicit value instead.
318              
319             =item C $port>
320              
321             Optional, default = C<$KAFKA_SERVER_PORT>.
322              
323             C<$port> is the attribute denoting the port number of the service we want to
324             access (Apache Kafka service). C<$port> should be an integer number.
325              
326             C<$KAFKA_SERVER_PORT> is the default Apache Kafka server port constant (C<9092>) that can
327             be imported from the L module.
328              
329             =item C $broker_list>
330              
331             Optional, C<$broker_list> is a reference to array of the host:port or [IPv6_host]:port strings, defining the list
332             of Kafka servers. This list will be used to locate the new leader if the server specified
333             via C $host> and C $port> arguments becomes unavailable. Either C
334             or C must be supplied.
335              
336             =item C $ip_version>
337              
338             Specify version of IP for interpreting of passed IP address and resolving of host name.
339              
340             Optional, undefined by default, which works in the following way: version of IP address
341             is detected automatically, host name is resolved into IPv4 address.
342              
343             See description of L<$IP_V4|Kafka::IO/$IP_V4>, L<$IP_V6|Kafka::IO/$IP_V6>
344             in C L.
345              
346             =item C $timeout>
347              
348             Optional, default = C<$Kafka::REQUEST_TIMEOUT>.
349              
350             C<$timeout> specifies how long we wait for the remote server to respond.
351             C<$timeout> is in seconds, could be a positive integer or a floating-point number not bigger than int32 positive integer.
352              
353             Special behavior when C is set to C:
354              
355             =back
356              
357             =over 3
358              
359             =item *
360              
361             Alarms are not used internally (namely when performing C).
362              
363             =item *
364              
365             Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.
366              
367             =back
368              
369             =over 3
370              
371             =item C $attempts>
372              
373             Optional, int32 signed integer, default = C<$Kafka::SEND_MAX_ATTEMPTS> .
374              
375             In some circumstances (leader is temporarily unavailable, outdated metadata, etc) we may fail to send a message.
376             This property specifies the maximum number of attempts to send a message.
377             The C<$attempts> should be an integer number.
378              
379             =item C $backoff>
380              
381             Optional, default = C<$Kafka::RETRY_BACKOFF> .
382              
383             Since leader election takes a bit of time, this property specifies the amount of time,
384             in milliseconds, that the producer waits before refreshing the metadata.
385             The C<$backoff> should be an integer number.
386              
387             =item C $mode>
388              
389             Optional, default value is 0 (false).
390              
391             Kafka BUG "[KAFKA-1124]" (Fixed in Kafka 0.8.2):
392             I controls how this module handles the first access to non-existent topic
393             when C in server configuration is C.
394             If I is false (default),
395             the first access to non-existent topic produces an exception;
396             however, the topic is created and next attempts to access it will succeed.
397              
398             If I is true, this module waits
399             (according to the C and C properties)
400             until the topic is created,
401             to avoid errors on the first access to non-existent topic.
402              
403             If C in server configuration is C, this setting has no effect.
404              
405             =item C $number>
406              
407             Optional, default value is 100.
408              
409             Defines maximum number of last non-fatal errors that we keep in log. Use method L to
410             access those errors.
411              
412             =item C $boolean>
413              
414             Optional, default value is 0 (false).
415              
416             If set to false, when communicating with a broker, the client will
417             automatically try to find out the best version numbers to use for each of the
418             API endpoints.
419              
420             If set to true, the client will always use
421             C<$Kafka::Protocol::DEFAULT_APIVERSION> as API version.
422              
423             WARNING: API versions are supported starting from Kafka 0.10. Set this parameter to true
424             if you're connecting to 0.9.
425              
426             =back
427              
428             =cut
429             sub new {
430 233     233 1 122706 my ( $class, %p ) = @_;
431              
432 233         1758 my $self = bless {
433             host => q{},
434             port => $KAFKA_SERVER_PORT,
435             broker_list => [],
436             timeout => $REQUEST_TIMEOUT,
437             ip_version => undef,
438             SEND_MAX_ATTEMPTS => $SEND_MAX_ATTEMPTS,
439             RETRY_BACKOFF => $RETRY_BACKOFF,
440             AutoCreateTopicsEnable => 0,
441             MaxLoggedErrors => 100,
442             dont_load_supported_api_versions => 0,
443             }, $class;
444              
445 233   66     2677 exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
446              
447             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'host' )
448 233 100 100     2335 unless defined( $self->{host} ) && ( $self->{host} eq q{} || defined( _STRING( $self->{host} ) ) ) && !utf8::is_utf8( $self->{host} );
      100        
      100        
449             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'port' )
450 224 100       5558 unless _POSINT( $self->{port} );
451             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'timeout (%s)', $self->{timeout} ) )
452 209 50 66     3108 unless !defined( $self->{timeout} ) || ( defined _NUMBER( $self->{timeout} ) && int( 1000 * $self->{timeout} ) >= 1 && int( $self->{timeout} * 1000 ) <= $MAX_INT32 );
      66        
      33        
453             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'broker_list' )
454 199 100       537 unless _ARRAY0( $self->{broker_list} );
455             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'SEND_MAX_ATTEMPTS' )
456 183 100       3116 unless _POSINT( $self->{SEND_MAX_ATTEMPTS} );
457             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'RETRY_BACKOFF' )
458 168 100       3469 unless _POSINT( $self->{RETRY_BACKOFF} );
459             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'MaxLoggedErrors' )
460 153 50       3038 unless defined( _NONNEGINT( $self->{MaxLoggedErrors} ) );
461              
462 153         1150 my $ip_version = $self->{ip_version};
463 153 100 66     368 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'ip_version (%s)', $ip_version ) )
      66        
      66        
464             unless ( !defined( $ip_version ) || ( defined( _NONNEGINT( $ip_version ) ) && ( $ip_version == $IP_V4 || $ip_version == $IP_V6 ) ) );
465              
466 151         281 $self->{_metadata} = {}; # {
467             # TopicName => {
468             # Partition => {
469             # 'Leader' => ...,
470             # 'Replicas' => [
471             # ...,
472             # ],
473             # 'Isr' => [
474             # ...,
475             # ],
476             # },
477             # ...,
478             # },
479             # ...,
480             # }
481 151         246 $self->{_leaders} = {}; # {
482             # NodeId => host:port or [IPv6_host]:port,
483             # ...,
484             # }
485 151         243 $self->{_group_coordinators} = {}; # {
486             # GroupId => host:port or [IPv6_host]:port,
487             # ...,
488             # }
489 151         227 $self->{_nonfatal_errors} = [];
490 151         247 my $IO_cache = $self->{_IO_cache} = {}; # host:port or [IPv6_host]:port => {
491             # 'NodeId' => ...,
492             # 'IO' => ...,
493             # 'timeout' => ...,
494             # 'host' => ...,
495             # 'port' => ...,
496             # 'error' => ...,
497             # },
498             # ...,
499              
500             # init IO cache
501 151 100       577 foreach my $server ( ( $self->{host} ? $self->_build_server_name( $self->{host}, $self->{port} ) : (), @{ $self->{broker_list} } ) ) {
  151         324  
502 169 100       386 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'bad host:port or broker_list element' )
503             unless $self->_is_like_server( $server );
504 150         285 my ( $host, $port ) = _split_host_port( $server );
505 150         314 my $correct_server = $self->_build_server_name( $host, $port );
506 150         678 $IO_cache->{ $correct_server } = {
507             NodeId => undef,
508             IO => undef,
509             host => $host,
510             port => $port,
511             };
512             }
513              
514 132 100       338 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'server is not specified' )
515             unless keys( %$IO_cache );
516              
517 131         995 return $self;
518             }
519              
520             #-- public attributes ----------------------------------------------------------
521              
522             =head2 METHODS
523              
524             The following methods are defined for the C class:
525              
526             =cut
527              
528             #-- public methods -------------------------------------------------------------
529              
530             =head3 C
531              
532             Returns the list of known Kafka servers (in host:port or [IPv6_host]:port format).
533              
534             =cut
535             sub get_known_servers {
536 145     145 1 2733 my ( $self ) = @_;
537              
538 145         317 return keys %{ $self->{_IO_cache} };
  145         765  
539             }
540              
541             sub _get_api_versions {
542 10124     10124   11780 my ( $self, $server ) = @_;
543              
544 10124         11805 my $server_metadata = $self->{_IO_cache}->{$server};
545 10124 50       12556 defined $server_metadata
546             or die "Fatal error: server '$server' is unknown in IO cache, which should not happen";
547              
548             # if we have cached data, just use it
549             defined $server_metadata->{_api_versions}
550 10124 100       19374 and return $server_metadata->{_api_versions};
551              
552             # no cached data. Initialize empty one
553 96         283 my $server_api_versions = $server_metadata->{_api_versions} = {};
554              
555             # use empty data if client doesn't want to detect API versions
556             $self->{dont_load_supported_api_versions}
557 96 50       473 and return $server_api_versions;
558              
559             # call the server and try to get the supported API versions
560 0         0 my $api_versions = [];
561 0         0 my $error;
562             try {
563             # The ApiVersions API endpoint is only supported on Kafka versions >
564             # 0.10.0.0 so this call may fail. We simply ignore this failure and
565             # carry on.
566 0     0   0 $api_versions = $self->_get_supported_api_versions( $server );
567             }
568             catch {
569 0     0   0 $error = $_;
570 0         0 };
571              
572 0 0       0 if( defined $error ) {
573 0 0 0     0 if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
574 0 0       0 if( $error->code == $ERROR_MISMATCH_ARGUMENT ) {
575             # rethrow known fatal errors
576 0         0 die $error;
577             }
578 0         0 $self->_remember_nonfatal_error( $error->code, $error, $server );
579             } else {
580 0         0 die $error;
581             }
582             }
583              
584 0         0 foreach my $element (@$api_versions) {
585             # we want to choose which api version to use for each API call. We
586             # try to use the max version that the server supports, with
587             # fallback to the max version the protocol implements. If it's
588             # lower than the min version the kafka server supports, we set it
589             # to -1. If thie API endpoint is called, it'll die.
590 0         0 my $kafka_min_version = $element->{MinVersion};
591 0         0 my $kafka_max_version = $element->{MaxVersion};
592 0         0 my $api_key = $element->{ApiKey};
593 0   0     0 my $implemented_max_version = $IMPLEMENTED_APIVERSIONS->{$api_key} // -1;
594 0         0 my $version = $kafka_max_version;
595 0 0       0 $version > $implemented_max_version
596             and $version = $implemented_max_version;
597 0 0       0 $version < $kafka_min_version
598             and $version = -1;
599 0         0 $server_api_versions->{$api_key} = $version;
600             }
601              
602 0         0 return $server_api_versions;
603             }
604              
605             # Returns the list of supported API versions. This is not really. *Warning*,
606             # this call works only against Kafka 1.10.0.0
607              
608             sub _get_supported_api_versions {
609 0     0   0 my ( $self, $broker ) = @_;
610              
611 0         0 my $CorrelationId = _get_CorrelationId();
612 0         0 my $decoded_request = {
613             CorrelationId => $CorrelationId,
614             ClientId => q{},
615             ApiVersion => 0,
616             };
617 0 0       0 say STDERR format_message( '[%s] apiversions request: %s',
618             scalar( localtime ),
619             $decoded_request,
620             ) if $self->debug_level;
621 0         0 my $encoded_request = $protocol{ $APIKEY_APIVERSIONS }->{encode}->( $decoded_request );
622              
623 0         0 my $encoded_response_ref;
624              
625             # receive apiversions. We use a code block because it's actually a loop where
626             # you can do last.
627             {
628 0 0       0 $self->_connectIO( $broker )
  0         0  
629             or last;
630 0 0       0 my $sent = $self->_sendIO( $broker, $encoded_request )
631             or last;
632 0         0 $encoded_response_ref = $self->_receiveIO( $broker );
633             }
634              
635 0 0       0 unless ( $encoded_response_ref ) {
636             # NOTE: it is possible to repeat the operation here
637 0         0 $self->_error( $ERROR_CANNOT_RECV );
638             }
639              
640 0         0 my $decoded_response = $protocol{ $APIKEY_APIVERSIONS }->{decode}->( $encoded_response_ref );
641 0 0       0 say STDERR format_message( '[%s] apiversions response: %s',
642             scalar( localtime ),
643             $decoded_response,
644             ) if $self->debug_level;
645 0 0 0     0 ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
646             # FATAL error
647             or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
648 0         0 my $ErrorCode = $decoded_response->{ErrorCode};
649              
650             # we asked a Kafka < 0.10 ( in this case the call is not
651             # implemented and it dies
652 0 0       0 $ErrorCode == $ERROR_NO_ERROR
653             or $self->_error($ErrorCode);
654              
655 0         0 my $api_versions = $decoded_response->{ApiVersions};
656 0         0 return $api_versions;
657             }
658              
659             =head3 C
660              
661             If C<$topic> is present, it must be a non-false string of non-zero length.
662              
663             If C<$topic> is absent, this method returns metadata for all topics.
664              
665             Updates kafka cluster's metadata description and returns the hash reference to metadata,
666             which can be schematically described as:
667              
668             {
669             TopicName => {
670             Partition => {
671             'Leader' => ...,
672             'Replicas' => [
673             ...,
674             ],
675             'Isr' => [
676             ...,
677             ],
678             },
679             ...,
680             },
681             ...,
682             }
683              
684             Consult Kafka "Wire protocol" documentation for more details about metadata structure.
685              
686             =cut
687             sub get_metadata {
688 1     1 1 904 my ( $self, $topic ) = @_;
689              
690 1 50 33     14 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      33        
      33        
691             unless !defined( $topic ) || ( ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic ) );
692              
693 1 50       4 $self->_update_metadata( $topic )
694             # FATAL error
695             or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s'", $topic ) );
696              
697 1         22 my $clone;
698 1 50       5 if ( defined $topic ) {
699             $clone = {
700 1         49 $topic => dclone( $self->{_metadata}->{ $topic } )
701             };
702             } else {
703 0         0 $clone = dclone( $self->{_metadata} );
704             }
705              
706 1         5 return $clone;
707             }
708              
709             =head3 C
710              
711             Returns true, if C<$server> (host:port or [IPv6_host]:port) is known in cluster.
712              
713             =cut
714             sub is_server_known {
715 43     43 1 816 my ( $self, $server ) = @_;
716              
717 43 100       74 $self->_error( $ERROR_MISMATCH_ARGUMENT )
718             unless $self->_is_like_server( $server );
719              
720 5         25 return exists $self->{_IO_cache}->{ $server };
721             }
722              
723             # Returns true, if known C<$server> (host:port or [IPv6_host]:port) is accessible.
724             # Checks the accessibility of the server.
725             # This is evil: opens and closes a NEW connection immediately so do not use unless there is a strong reason for it.
726             sub _is_server_alive {
727 22     22   1797 my ( $self, $server ) = @_;
728              
729 22 100       39 $self->_error( $ERROR_MISMATCH_ARGUMENT )
730             unless $self->_is_like_server( $server );
731              
732 3 50       7 $self->_error( $ERROR_NO_KNOWN_BROKERS, 'has not yet received the metadata?' )
733             unless $self->get_known_servers;
734              
735 3         6 my $io_cache = $self->{_IO_cache};
736             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Unknown server '%s' (is not found in the metadata)", $server ) )
737 3 100       9 unless exists( $io_cache->{ $server } );
738              
739 2 50       6 if ( my $io = $self->_connectIO( $server ) ) {
740 2         8 return $io->_is_alive;
741             } else {
742 0         0 return;
743             }
744             }
745              
746             # this is evil, do not use unless there is a very strong reason for it
747             sub _is_server_connected {
748 33     33   1490 my ( $self, $server ) = @_;
749              
750 33 100       77 $self->_error( $ERROR_MISMATCH_ARGUMENT )
751             unless $self->_is_like_server( $server );
752              
753 14         40 my $io_cache = $self->{_IO_cache};
754 14         20 my $io;
755 14 100 66     56 unless ( exists( $io_cache->{ $server } ) && ( $io = $io_cache->{ $server }->{IO} ) ) {
756 8         29 return;
757             }
758              
759 6         23 return $io->_is_alive;
760             }
761              
762             =head3 C
763              
764             =over 3
765              
766             =item C<$request>
767              
768             C<$request> is a reference to the hash representing
769             the structure of the request.
770              
771             This method encodes C<$request>, passes it to the leader of cluster, receives reply, decodes and returns
772             it in a form of hash reference.
773              
774             =back
775              
776             WARNING:
777              
778             =over 3
779              
780             =item *
781              
782             This method should be considered private and should not be called by an end user.
783              
784             =item *
785              
786             In order to achieve better performance, this method does not perform arguments validation.
787              
788             =back
789              
790             =over 3
791              
792             =item C<$compression_codec>
793              
794             Optional.
795              
796             C<$compression_codec> sets the required type of C<$messages> compression,
797             if the compression is desirable.
798              
799             Supported codecs:
800             L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>,
801             L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>,
802             L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>,
803             L<$COMPRESSION_LZ4|Kafka/$COMPRESSION_LZ4>.
804              
805             NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.
806              
807              
808             =back
809              
810             =cut
811             sub receive_response_to_request {
812 10098     10098 1 20803 my ( $self, $request, $compression_codec, $response_timeout ) = @_;
813              
814 10098         12594 my $api_key = $request->{ApiKey};
815              
816 10098   50     21813 my $host_to_send_to = $request->{__send_to__} // 'leader';
817              
818             # WARNING: The current version of the module limited to the following:
819             # supports queries with only one combination of topic + partition (first and only).
820              
821 10098         11929 my $topic_data = $request->{topics}->[0];
822 10098         10862 my $topic_name = $topic_data->{TopicName};
823 10098         12591 my $partition = $topic_data->{partitions}->[0]->{Partition};
824              
825 10098 100 33     9296 if (
      33        
      66        
826 10098         51538 !%{ $self->{_metadata} } # the first request
827             || ( !$self->{AutoCreateTopicsEnable} && defined( $topic_name ) && !exists( $self->{_metadata}->{ $topic_name } ) )
828             ) {
829 52 100       157 $self->_update_metadata( $topic_name ) # hash metadata could be updated
830             # FATAL error
831             or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s'", $topic_name ), request => $request )
832             ;
833             }
834              
835 10095 50       17293 $request->{CorrelationId} = _get_CorrelationId() unless exists $request->{CorrelationId};
836              
837 10095 50       18598 say STDERR format_message( '[%s] compression_codec=%s, request=%s',
838             scalar( localtime ),
839             $compression_codec,
840             $request,
841             ) if $self->debug_level;
842              
843 10095         13306 my( $ErrorCode, $partition_data, $io_error );
844              
845 10095         9583 my $attempt = 0;
846             # we save the original api version of the request, because in the attempt
847             # loop we might be trying different brokers which may support different api
848             # versions.
849 10095         10913 my $original_request_api_version = $request->{ApiVersion};
850 10095   50     20225 ATTEMPT: while ( ++$attempt <= ( $self->{SEND_MAX_ATTEMPTS} // 1 ) ) {
851 10146         10525 $ErrorCode = $ERROR_NO_ERROR;
852 10146         11138 undef $io_error;
853              
854 10146         11311 my $server;
855 10146 50       14430 if ($host_to_send_to eq 'leader') {
    0          
856             # hash metadata could be updated
857 10146         14435 my $leader = $self->{_metadata}->{ $topic_name }->{ $partition }->{Leader};
858 10146 50       13312 next ATTEMPT unless defined $leader;
859              
860 10146         13529 $server = $self->{_leaders}->{ $leader };
861 10146 50       13943 unless ( $server ) {
862 0         0 $ErrorCode = $ERROR_LEADER_NOT_FOUND;
863 0         0 $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
864 0         0 next ATTEMPT;
865             }
866             } elsif ( $host_to_send_to eq 'group_coordinator') {
867 0         0 my $group_id = $request->{GroupId};
868 0 0 0     0 if ( !%{ $self->{_group_coordinators} } && defined $group_id) {
  0         0  
869             # first request
870 0         0 $self->_update_group_coordinators($group_id);
871             }
872 0         0 $server = $self->{_group_coordinators}->{$group_id};
873 0 0       0 unless ( $server ) {
874 0         0 $ErrorCode = $ERROR_GROUP_COORDINATOR_NOT_FOUND;
875 0         0 $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
876 0         0 next ATTEMPT;
877             }
878             } else {
879 0         0 die "__send_to__ must be either 'leader', 'group_coordinator', or void (will default to 'leader')";
880             }
881              
882             # Send a request to the server
883 10146 100       16725 if ( $self->_connectIO( $server ) ) {
884             # we can connect to this server, so let's detect the api versions
885             # it and use whatever it supports, except if the request forces us
886             # to use an api version. Warning, the version might end up being
887             # undef if detection against the Kafka server failed, or if
888             # dont_load_supported_api_versions is true. However the Encoder
889             # code knows how to handle it.
890 10142         11876 $request->{ApiVersion} = $original_request_api_version;
891 10142 100       14813 unless( defined $request->{ApiVersion} ) {
892 10124         14217 $request->{ApiVersion} = $self->_get_api_versions( $server )->{ $api_key };
893             # API versions request may fail and the server may be disconnected
894 10124 50       15117 unless( $self->_is_IO_connected( $server ) ) {
895             # this attempt does not count, assuming that _get_api_versions will not try to get them from failing broker again
896 0         0 redo ATTEMPT;
897             }
898             }
899              
900 10142         24465 my $encoded_request = $protocol{ $api_key }->{encode}->( $request, $compression_codec );
901              
902 10142 100       19840 unless ( $self->_sendIO( $server, $encoded_request ) ) {
903 2         6 $io_error = $self->_io_error( $server );
904 2 50       8 $ErrorCode = $io_error ? $io_error->code : $ERROR_CANNOT_SEND;
905 2         45 $self->_closeIO( $server, 1 );
906             }
907             }
908             else {
909 4         13 $io_error = $self->_io_error( $server );
910 4 50       19 $ErrorCode = $io_error ? $io_error->code : $ERROR_CANNOT_BIND;
911             }
912              
913 10146 100       18093 if ( $ErrorCode != $ERROR_NO_ERROR ) {
914             # could not send request due to non-fatal IO error (fatal errors should be thrown by connectIO/sendIO already)
915 6         16 $self->_remember_nonfatal_error( $ErrorCode, $self->_io_error( $server ), $server, $topic_name, $partition );
916 6 50 33     30 if( $api_key == $APIKEY_PRODUCE && !( $ErrorCode == $ERROR_CANNOT_BIND || $ErrorCode == $ERROR_NO_CONNECTION ) ) {
      66        
917             # do not retry failed produce requests which may have sent some data already
918 0         0 $ErrorCode = $ERROR_CANNOT_SEND;
919 0         0 last ATTEMPT;
920             }
921 6         16 next ATTEMPT;
922             }
923              
924 10140         11004 my $response;
925 10140 100 100     25966 if ( $api_key == $APIKEY_PRODUCE && $request->{RequiredAcks} == $NOT_SEND_ANY_RESPONSE ) {
926             # Do not receive a response, self-forming own response
927             $response = {
928             CorrelationId => $request->{CorrelationId},
929 5009         20124 topics => [
930             {
931             TopicName => $topic_name,
932             partitions => [
933             {
934             Partition => $partition,
935             ErrorCode => 0,
936             Offset => $BAD_OFFSET,
937             },
938             ],
939             },
940             ],
941             };
942             } else {
943 5131         8972 my $encoded_response_ref = $self->_receiveIO( $server, $response_timeout );
944 5131 100       9255 unless ( $encoded_response_ref ) {
945 2 100       5 if ( $api_key == $APIKEY_PRODUCE ) {
946             # WARNING: Unfortunately, the sent package (one or more messages) does not have a unique identifier
947             # and there is no way to verify the delivery of data
948 1         2 $ErrorCode = $ERROR_SEND_NO_ACK;
949              
950             # Should not be allowed to re-send data on the next attempt
951             # FATAL error
952 1         4 $self->_error( $ErrorCode, "no ack for request", io_error => $self->_io_error( $server ), request => $request );
953 0         0 last ATTEMPT;
954             } else {
955 1         2 $ErrorCode = $ERROR_CANNOT_RECV;
956 1         3 $self->_remember_nonfatal_error( $ErrorCode, $self->_io_error( $server ), $server, $topic_name, $partition );
957 1         3 next ATTEMPT;
958             }
959             }
960 5129 100       8319 if ( length( $$encoded_response_ref ) > 4 ) { # MessageSize => int32
961             # we also pass the api version that was used for the request,
962             # so that we know how to decode the response
963 5128         13830 $response = $protocol{ $api_key }->{decode}->( $encoded_response_ref, $request->{ApiVersion} );
964 5128 50       11989 say STDERR format_message( '[%s] response: %s',
965             scalar( localtime ),
966             $response,
967             ) if $self->debug_level;
968             } else {
969 1         4 $self->_error( $ERROR_RESPONSEMESSAGE_NOT_RECEIVED, format_message("response length=%s", length( $$encoded_response_ref ) ), io_error => $self->_io_error( $server ), request => $request );
970             }
971             }
972              
973             # FATAL error if correllation does not match
974             $self->_error( $ERROR_MISMATCH_CORRELATIONID, "$response->{CorrelationId} != $request->{CorrelationId}", request => $request, response => $response )
975             unless $response->{CorrelationId} == $request->{CorrelationId}
976 10137 50       19347 ;
977 10137         13778 $topic_data = $response->{topics}->[0];
978 10137 100       19211 $partition_data = $topic_data->{ $api_key == $APIKEY_OFFSET ? 'PartitionOffsets' : 'partitions' }->[0];
979              
980 10137         11158 $ErrorCode = $partition_data->{ErrorCode};
981              
982 10137 100       29977 return $response if $ErrorCode == $ERROR_NO_ERROR; # success
983              
984 84 50 33     503 if( $api_key == $APIKEY_PRODUCE && $ErrorCode == $ERROR_REQUEST_TIMED_OUT ) {
985             # special case: produce request timed out so we did not get expected ACK and should not retry sending request again
986             # Should not be allowed to re-send data on the next attempt
987             # FATAL error
988 0         0 $self->_error( $ERROR_SEND_NO_ACK, format_message( "topic='%s', partition=%s response error: %s", $topic_name, $partition, $ErrorCode ), request => $request, response => $response );
989 0         0 last ATTEMPT;
990             }
991              
992 84 100       351 if ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) {
993 64         382 $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
994 64         259 next ATTEMPT;
995             }
996              
997             # FATAL error
998 20         68 $self->_error( $ErrorCode, format_message( "topic='%s', partition=%s", $topic_name, $partition ), request => $request );
999             } continue {
1000             # Expect to possible changes in the situation, such as restoration of connection
1001             say STDERR format_message( '[%s] sleeping for %d ms before making request attempt #%d (%s)',
1002             scalar( localtime ),
1003             $self->{RETRY_BACKOFF},
1004 71 0       202 $attempt + 1,
    50          
1005             $ErrorCode == $ERROR_NO_ERROR ? 'refreshing metadata' : "ErrorCode ${ErrorCode}",
1006             ) if $self->debug_level;
1007              
1008 71         28214244 Time::HiRes::sleep( $self->{RETRY_BACKOFF} / 1000 );
1009              
1010 71 100 33     1728 $self->_update_metadata( $topic_name )
1011             # FATAL error
1012             or $self->_error( $ErrorCode || $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s', partition=%s", $topic_name, $partition ), request => $request )
1013             ;
1014 68 50       531 if ( $host_to_send_to eq 'group_coordinator') {
1015             $self->_update_group_coordinators($request->{GroupId})
1016 0         0 }
1017             }
1018              
1019             # FATAL error
1020 17 50       52 if ( $ErrorCode ) {
1021 17 100       142 $self->_error( $ErrorCode, format_message( "topic='%s'%s", $topic_data->{TopicName}, $partition_data ? ", partition = ".$partition_data->{Partition} : '' ), request => $request, io_error => $io_error );
1022             } else {
1023 0         0 $self->_error( $ERROR_UNKNOWN_TOPIC_OR_PARTITION, format_message( "topic='%s', partition=%s", $topic_name, $partition ), request => $request, io_error => $io_error );
1024             }
1025              
1026 0         0 return;
1027             }
1028              
1029             =head3 C
1030              
1031             Returns true if the metadata contains information about specified combination of topic and partition.
1032             Otherwise returns false.
1033              
1034             C takes the following arguments:
1035              
1036             =over 3
1037              
1038             =item C<$topic>
1039              
1040             The C<$topic> must be a normal non-false string of non-zero length.
1041              
1042             =item C<$partition>
1043              
1044             =back
1045              
1046             =cut
1047             sub exists_topic_partition {
1048 3     3 1 475 my ( $self, $topic, $partition ) = @_;
1049              
1050 3 50 33     25 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      33        
      33        
1051             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) ) && !utf8::is_utf8( $topic );
1052 3 50 33     17 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      33        
1053             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
1054              
1055 3 50       3 unless ( %{ $self->{_metadata} } ) { # the first request
  3         7  
1056 0 0       0 $self->_update_metadata( $topic ) # hash metadata could be updated
1057             # FATAL error
1058             or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s'", $topic ) );
1059             }
1060              
1061 3         13 return exists $self->{_metadata}->{ $topic }->{ $partition };
1062             }
1063              
1064             =head3 C
1065              
1066             Closes connection with C<$server> (defined as host:port or [IPv6_host]:port).
1067              
1068             =cut
1069             sub close_connection {
1070 22     22 1 1767 my ( $self, $server ) = @_;
1071              
1072 22 50       48 unless ( $self->is_server_known( $server ) ) {
1073 0         0 return;
1074             }
1075              
1076 3         12 $self->_closeIO( $server );
1077 3         13 return 1;
1078             }
1079              
1080             =head3 C
1081              
1082             Closes connection with all known Kafka servers.
1083              
1084             =cut
1085             sub close {
1086 7     7 1 47122 my ( $self ) = @_;
1087              
1088 7         18 foreach my $server ( $self->get_known_servers ) {
1089 19         33 $self->_closeIO( $server );
1090             }
1091              
1092 7         62 return;
1093             }
1094              
1095             =head3 C
1096              
1097             Returns a reference to a hash.
1098              
1099             Each hash key is the identifier of the server (host:port or [IPv6_host]:port), and the value is the last communication error
1100             with that server.
1101              
1102             An empty hash is returned if there were no communication errors.
1103              
1104             =cut
1105             sub cluster_errors {
1106 2     2 1 1023 my ( $self ) = @_;
1107              
1108 2         5 my %errors;
1109 2         5 foreach my $server ( $self->get_known_servers ) {
1110 4 100       8 if ( my $error = $self->_io_error( $server ) ) {
1111 3         19 $errors{ $server } = $error;
1112             }
1113             }
1114              
1115 2         7 return \%errors;
1116             }
1117              
1118             =head3 C
1119              
1120             Returns a reference to an array of the last non-fatal errors.
1121              
1122             Maximum number of entries is set using C parameter of L.
1123              
1124             A reference to the empty array is returned if there were no non-fatal errors or parameter C
1125             is set to 0.
1126              
1127             =cut
1128             sub nonfatal_errors {
1129 81     81 1 79923 my ( $self ) = @_;
1130              
1131 81         343 return $self->{_nonfatal_errors};
1132             }
1133              
1134             =head3 C
1135              
1136             Clears an array of the last non-fatal errors.
1137              
1138             A reference to the empty array is returned because there are no non-fatal errors now.
1139              
1140             =cut
1141             sub clear_nonfatals {
1142 0     0 1 0 my ( $self ) = @_;
1143              
1144 0         0 @{ $self->{_nonfatal_errors} } = ();
  0         0  
1145              
1146 0         0 return $self->{_nonfatal_errors};
1147             }
1148              
1149             #-- private attributes ---------------------------------------------------------
1150              
1151             #-- private functions ----------------------------------------------------------
1152              
1153             sub _split_host_port {
1154 372     372   545 my ( $server ) = @_;
1155              
1156 372         1364 my ( $host, $port ) = $server=~ /^(.+):(\d+)$/;
1157 372 50 66     1114 $host = $1 if $host && $host =~ /^\[(.+)\]$/;
1158              
1159 372         811 return( $host, $port );
1160             }
1161              
1162             #-- private methods ------------------------------------------------------------
1163              
1164             # Remember non-fatal error
1165             sub _remember_nonfatal_error {
1166 71     71   306 my ( $self, $error_code, $error, $server, $topic, $partition ) = @_;
1167              
1168             my $max_logged_errors = $self->{MaxLoggedErrors}
1169 71 50       254 or return;
1170              
1171 0         0 shift( @{ $self->{_nonfatal_errors} } )
1172 71 50       98 if scalar( @{ $self->{_nonfatal_errors} } ) == $max_logged_errors;
  71         237  
1173             my $msg = format_message( "[%s] Non-fatal error: %s (ErrorCode %s, server '%s', topic '%s', partition %s)",
1174             scalar( localtime ),
1175 71 0 0     4688 $error // ( defined( $error_code ) && exists( $ERROR{ $error_code } ) ? $ERROR{ $error_code } : '' ),
      33        
      50        
1176             $error_code // 'IO error',
1177             $server,
1178             $topic,
1179             $partition,
1180             );
1181              
1182 71 50       290 say STDERR $msg
1183             if $self->debug_level;
1184              
1185 71         137 push @{ $self->{_nonfatal_errors} }, $msg;
  71         192  
1186              
1187 71         149 return $msg;
1188             }
1189              
1190             # Returns identifier of the cluster leader (host:port or [IPv6_host]:port)
1191             sub _find_leader_server {
1192 118     118   285 my ( $self, $node_id ) = @_;
1193              
1194 118         147 my $leader_server;
1195 118         200 my $IO_cache = $self->{_IO_cache};
1196 118         177 my $NodeId;
1197 118         392 foreach my $server ( keys %$IO_cache ) {
1198 227         328 $NodeId = $IO_cache->{ $server }->{NodeId};
1199 227 100 66     789 if ( defined( $NodeId ) && $NodeId == $node_id ) {
1200 118         179 $leader_server = $server;
1201 118         209 last;
1202             }
1203             }
1204              
1205 118         532 return $leader_server;
1206             }
1207              
1208             # Form a list of servers to attempt querying of the metadata
1209             sub _get_interviewed_servers {
1210 124     124   284 my ( $self ) = @_;
1211              
1212 124         217 my ( @priority, @secondary, @rest );
1213 124         348 my $IO_cache = $self->{_IO_cache};
1214 124         167 my $server_data;
1215 124         389 foreach my $server ( $self->get_known_servers ) {
1216 266         502 $server_data = $IO_cache->{ $server };
1217 266 100       704 if ( defined $server_data->{NodeId} ) {
1218 213 100       539 if ( $server_data->{IO} ) {
1219 135         355 push @priority, $server;
1220             } else {
1221 78         212 push @secondary, $server;
1222             }
1223             } else {
1224 53         95 push @rest, $server;
1225             }
1226             }
1227              
1228 124         1061 return( shuffle( @priority ), shuffle( @secondary ), shuffle( @rest ) );
1229             }
1230              
1231             # Refresh group_coordinators for given topic
1232             sub _update_group_coordinators {
1233 0     0   0 my ($self, $group_id) = @_;
1234              
1235 0         0 my $CorrelationId = _get_CorrelationId();
1236 0         0 my $decoded_request = {
1237             CorrelationId => $CorrelationId,
1238             ClientId => q{},
1239             CoordinatorKey => $group_id,
1240             CoordinatorType => 0, # type is group
1241             };
1242 0 0       0 say STDERR format_message( '[%s] group coordinators request: %s',
1243             scalar( localtime ),
1244             $decoded_request,
1245             ) if $self->debug_level;
1246 0         0 my $encoded_request = $protocol{ $APIKEY_FINDCOORDINATOR }->{encode}->( $decoded_request );
1247              
1248 0         0 my $encoded_response_ref;
1249 0         0 my @brokers = $self->_get_interviewed_servers;
1250              
1251             # receive coordinator data
1252 0         0 foreach my $broker ( @brokers ) {
1253 0 0 0     0 last if $self->_connectIO( $broker )
      0        
1254             && $self->_sendIO( $broker, $encoded_request )
1255             && ( $encoded_response_ref = $self->_receiveIO( $broker ) );
1256             }
1257              
1258 0 0       0 unless ( $encoded_response_ref ) {
1259             # NOTE: it is possible to repeat the operation here
1260 0         0 return;
1261             }
1262              
1263 0         0 my $decoded_response = $protocol{ $APIKEY_FINDCOORDINATOR }->{decode}->( $encoded_response_ref );
1264 0 0       0 say STDERR format_message( '[%s] group coordinators: %s',
1265             scalar( localtime ),
1266             $decoded_response,
1267             ) if $self->debug_level;
1268 0 0 0     0 ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
1269             # FATAL error
1270             or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
1271             $decoded_response->{ErrorCode}
1272 0 0       0 and $self->_error( $decoded_response->{ErrorCode} );
1273              
1274 0         0 my $IO_cache = $self->{_IO_cache};
1275 0         0 my $server = $self->_build_server_name( @{ $decoded_response }{ 'Host', 'Port' } );
  0         0  
1276             $IO_cache->{ $server } = { # can add new servers
1277             IO => $IO_cache->{ $server }->{IO}, # IO or undef
1278             NodeId => $decoded_response->{NodeId},
1279             host => $decoded_response->{Host},
1280             port => $decoded_response->{Port},
1281 0         0 };
1282 0         0 $self->{_group_coordinators}->{ $group_id } = $server;
1283              
1284 0         0 return 1;
1285             }
1286              
1287             # Refresh metadata for given topic
1288             sub _update_metadata {
1289 124     124   710 my ( $self, $topic, $is_recursive_call ) = @_;
1290              
1291 124         911 my $CorrelationId = _get_CorrelationId();
1292 124   33     1589 my $decoded_request = {
1293             CorrelationId => $CorrelationId,
1294             ClientId => q{},
1295             topics => [
1296             $topic // (),
1297             ],
1298             };
1299 124 50       967 say STDERR format_message( '[%s] metadata request: %s',
1300             scalar( localtime ),
1301             $decoded_request,
1302             ) if $self->debug_level;
1303 124         1031 my $encoded_request = $protocol{ $APIKEY_METADATA }->{encode}->( $decoded_request );
1304              
1305 124         258 my $encoded_response_ref;
1306 124         476 my @brokers = $self->_get_interviewed_servers;
1307              
1308             # receive metadata
1309 124         331 foreach my $broker ( @brokers ) {
1310 130 100 66     463 last if $self->_connectIO( $broker )
      100        
1311             && $self->_sendIO( $broker, $encoded_request )
1312             && ( $encoded_response_ref = $self->_receiveIO( $broker ) );
1313             }
1314              
1315 123 100       328 unless ( $encoded_response_ref ) {
1316             # NOTE: it is possible to repeat the operation here
1317 5         45 return;
1318             }
1319              
1320 118         548 my $decoded_response = $protocol{ $APIKEY_METADATA }->{decode}->( $encoded_response_ref );
1321 118 50       395 say STDERR format_message( '[%s] metadata response: %s',
1322             scalar( localtime ),
1323             $decoded_response,
1324             ) if $self->debug_level;
1325 118 50 33     585 ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
1326             # FATAL error
1327             or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
1328              
1329 118 50       526 unless ( _ARRAY( $decoded_response->{Broker} ) ) {
1330 0 0       0 if ( $self->{AutoCreateTopicsEnable} ) {
1331 0         0 return $self->_attempt_update_metadata( $is_recursive_call, $topic, undef, $ERROR_NO_KNOWN_BROKERS );
1332             } else {
1333             # FATAL error
1334 0         0 $self->_error( $ERROR_NO_KNOWN_BROKERS, format_message( "topic='%s'", $topic ) );
1335             }
1336             }
1337              
1338 118         221 my $IO_cache = $self->{_IO_cache};
1339              
1340             # Clear the previous information about the NodeId in the IO cache
1341 118         439 $IO_cache->{ $_ }->{NodeId} = undef for @brokers;
1342              
1343             # In the IO cache update/add obtained server information
1344 118         168 foreach my $received_broker ( @{ $decoded_response->{Broker} } ) {
  118         267  
1345 354         510 my $server = $self->_build_server_name( @{ $received_broker }{ 'Host', 'Port' } );
  354         854  
1346             $IO_cache->{ $server } = { # can add new servers
1347             IO => $IO_cache->{ $server }->{IO}, # IO or undef
1348             NodeId => $received_broker->{NodeId},
1349             host => $received_broker->{Host},
1350             port => $received_broker->{Port},
1351 354         1999 };
1352             }
1353              
1354             #NOTE: IO cache does not remove server that's missing in metadata
1355              
1356             # Collect the received metadata
1357 118         232 my $received_metadata = {};
1358 118         179 my $leaders = {};
1359              
1360 118         238 my $ErrorCode = $ERROR_NO_ERROR;
1361 118         201 my( $TopicName, $partition );
1362             METADATA_CREATION:
1363 118         182 foreach my $topic_metadata ( @{ $decoded_response->{TopicMetadata} } ) {
  118         282  
1364 118         235 $TopicName = $topic_metadata->{TopicName};
1365 118         192 undef $partition;
1366             last METADATA_CREATION
1367 118 50       362 if ( $ErrorCode = $topic_metadata->{ErrorCode} ) != $ERROR_NO_ERROR;
1368              
1369 118         212 foreach my $partition_metadata ( @{ $topic_metadata->{PartitionMetadata} } ) {
  118         245  
1370 118         214 $partition = $partition_metadata->{Partition};
1371             last METADATA_CREATION
1372 118 50 33     400 if ( $ErrorCode = $partition_metadata->{ErrorCode} ) != $ERROR_NO_ERROR
1373             && $ErrorCode != $ERROR_REPLICA_NOT_AVAILABLE;
1374 118         222 $ErrorCode = $ERROR_NO_ERROR;
1375              
1376 118         481 my $received_partition_data = $received_metadata->{ $TopicName }->{ $partition } = {};
1377 118         297 my $leader = $received_partition_data->{Leader} = $partition_metadata->{Leader};
1378 118         207 $received_partition_data->{Replicas} = [ @{ $partition_metadata->{Replicas} } ];
  118         341  
1379 118         202 $received_partition_data->{Isr} = [ @{ $partition_metadata->{Isr} } ];
  118         241  
1380              
1381 118         372 $leaders->{ $leader } = $self->_find_leader_server( $leader );
1382             }
1383             }
1384 118 50       277 if ( $ErrorCode != $ERROR_NO_ERROR ) {
1385 0 0       0 if ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) {
1386 0         0 return $self->_attempt_update_metadata( $is_recursive_call, $TopicName, $partition, $ErrorCode );
1387             } else {
1388             # FATAL error
1389 0 0       0 $self->_error( $ErrorCode, format_message( "topic='%s'%s", $TopicName, defined( $partition ) ? ", partition=$partition" : '' ) );
1390             }
1391             }
1392              
1393             # Update metadata for received topics
1394 118         230 $self->{_metadata}->{ $_ } = $received_metadata->{ $_ } foreach keys %{ $received_metadata };
  118         715  
1395 118         209 $self->{_leaders}->{ $_ } = $leaders->{ $_ } foreach keys %{ $leaders };
  118         451  
1396              
1397 118         1006 return 1;
1398             }
1399              
1400             # trying to get the metadata without error
1401             sub _attempt_update_metadata {
1402 0     0   0 my ( $self, $is_recursive_call, $topic, $partition, $error_code ) = @_;
1403              
1404 0 0       0 return if $is_recursive_call;
1405 0         0 $self->_remember_nonfatal_error( $error_code, $ERROR{ $error_code }, undef, $topic, $partition );
1406              
1407 0         0 my $attempts = $self->{SEND_MAX_ATTEMPTS};
1408             ATTEMPTS:
1409 0         0 while ( $attempts-- ) {
1410             say STDERR format_message( '[%s] sleeping for %d ms before making update metadata attempt #%d',
1411             scalar( localtime ),
1412             $self->{RETRY_BACKOFF},
1413 0 0       0 $self->{SEND_MAX_ATTEMPTS} - $attempts + 1,
1414             ) if $self->debug_level;
1415 0         0 Time::HiRes::sleep( $self->{RETRY_BACKOFF} / 1000 );
1416 0 0       0 return( 1 ) if $self->_update_metadata( $topic, 1 );
1417             }
1418             # FATAL error
1419 0 0       0 $self->_error( $error_code, format_message( "topic='%s'%s", $topic, defined( $partition ) ? ", partition=$partition" : '' ) );
1420              
1421 0         0 return;
1422             }
1423              
1424             # forms server identifier using supplied $host, $port
1425             sub _build_server_name {
1426 654     654   1177 my ( $self, $host, $port ) = @_;
1427              
1428 654 50       1696 $host = "[$host]" if is_ipv6( $host );
1429              
1430 654         10222 return "$host:$port";
1431             }
1432              
1433             # remembers error communicating with the server
1434             sub _on_io_error {
1435 20     20   43 my ( $self, $server_data, $error ) = @_;
1436 20         37 $server_data->{error} = $error;
1437 20 100       46 if( $server_data->{IO} ) {
1438 16         62 $server_data->{IO}->close;
1439 16         140 $server_data->{IO} = undef;
1440             }
1441              
1442 20 50 33     140 if( blessed( $error ) && $error->isa('Kafka::Exception') ) {
1443 20 100 66     414 if( $error->code == $ERROR_MISMATCH_ARGUMENT || $error->code == $ERROR_INCOMPATIBLE_HOST_IP_VERSION ) {
1444             # rethrow known fatal errors
1445 1         20 die $error;
1446             }
1447             } else {
1448             # rethrow all unknown errors
1449 0         0 die $error;
1450             }
1451              
1452 19         462 return;
1453             }
1454              
1455             sub _io_error {
1456 19     19   37 my( $self, $server ) = @_;
1457 19         23 my $error;
1458 19 50       40 if( my $server_data = $self->{_IO_cache}->{ $server } ) {
1459 19         29 $error = $server_data->{error};
1460             }
1461 19         55 return $error;
1462             }
1463              
1464             sub _is_IO_connected {
1465 10124     10124   11763 my ( $self, $server ) = @_;
1466 10124 50       17281 my $server_data = $self->{_IO_cache}->{ $server } or return;
1467 10124         16750 return $server_data->{IO};
1468             }
1469              
1470             # connects to a server (host:port or [IPv6_host]:port)
1471             sub _connectIO {
1472 10278     10278   13212 my ( $self, $server ) = @_;
1473              
1474 10278 50       18215 my $server_data = $self->{_IO_cache}->{ $server }
1475             or $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Unknown server '%s' (is not found in the metadata)", $server ) )
1476             ;
1477 10278 100       18269 unless( $server_data->{IO} ) {
1478 114         127 my $error;
1479             try {
1480             $server_data->{IO} = Kafka::IO->new(
1481             host => $server_data->{host},
1482             port => $server_data->{port},
1483             timeout => $self->{timeout},
1484             ip_version => $self->{ip_version},
1485 114     114   8084 );
1486 110         6973 $server_data->{error} = undef;
1487             } catch {
1488 4     4   3610 $error = $_;
1489 114         733 };
1490              
1491 114 100       1747 if( defined $error ) {
1492 4         14 $self->_on_io_error( $server_data, $error );
1493 4         10 return;
1494             }
1495             }
1496              
1497 10274         17015 return $server_data->{IO};
1498             }
1499              
1500             sub _server_data_IO {
1501 15524     15524   18198 my ( $self, $server ) = @_;
1502 15524 50       30163 my $server_data = $self->{_IO_cache}->{ $server }
1503             or $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Unknown server '%s' (is not found in the metadata)", $server ) )
1504             ;
1505             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Server '%s' is not connected", $server ) )
1506             unless $server_data->{IO}
1507 15524 50       21968 ;
1508 15524         23976 return ( $server_data, $server_data->{IO} );
1509             }
1510              
1511             # Send encoded request ($encoded_request) to server ($server)
1512             sub _sendIO {
1513 10272     10272   14772 my ( $self, $server, $encoded_request ) = @_;
1514 10272         16796 my( $server_data, $io ) = $self->_server_data_IO( $server );
1515 10272         11936 my $sent;
1516             my $error;
1517             try {
1518 10272     10272   351841 $sent = $io->send( $encoded_request );
1519             } catch {
1520 11     11   10646 $error = $_;
1521 10272         46596 };
1522              
1523 10272 100       245809 if( defined $error ) {
1524 11         33 $self->_on_io_error( $server_data, $error );
1525             }
1526              
1527 10271         25277 return $sent;
1528             }
1529              
1530             # Receive response from a given server
1531             sub _receiveIO {
1532 5252     5252   8509 my ( $self, $server, $response_timeout ) = @_;
1533 5252         8122 my( $server_data, $io ) = $self->_server_data_IO( $server );
1534 5252         6352 my $response_ref;
1535             my $error;
1536             try {
1537 5252     5252   177070 $response_ref = $io->receive( 4, $response_timeout ); # response header must arrive within request-specific timeout if provided
1538 5247 50 33     23575 if ( $response_ref && length( $$response_ref ) == 4 ) {
1539             # received 4-byte response header with response size; try receiving the rest
1540 5247         13741 my $message_body_ref = $io->receive( unpack( 'l>', $$response_ref ) );
1541 5247         23444 $$response_ref .= $$message_body_ref;
1542             }
1543             } catch {
1544 5     5   3638 $error = $_;
1545 5252         23454 };
1546              
1547 5252 100       61200 if( defined $error ) {
1548 5         13 $self->_on_io_error( $server_data, $error );
1549             }
1550              
1551 5252         9178 return $response_ref;
1552             }
1553              
1554             # Close connectino to $server
1555             sub _closeIO {
1556 24     24   43 my ( $self, $server, $keep_error ) = @_;
1557              
1558 24 50       57 if ( my $server_data = $self->{_IO_cache}->{ $server } ) {
1559 24 100       54 if ( my $io = $server_data->{IO} ) {
1560 13         44 $io->close;
1561 13 50       124 $server_data->{error} = undef unless $keep_error;
1562 13         43 $server_data->{IO} = undef;
1563             }
1564             }
1565              
1566 24         38 return;
1567             }
1568              
1569             # check validity of an argument to match host:port format
1570             sub _is_like_server {
1571 267     267   410 my ( $self, $server ) = @_;
1572              
1573 267 100 100     1427 unless (
      100        
1574             defined( $server )
1575             && defined( _STRING( $server ) )
1576             && !utf8::is_utf8( $server )
1577             ) {
1578 45         145 return;
1579             }
1580              
1581 222         410 my ( $host, $port ) = _split_host_port( $server );
1582 222 100 66     685 unless ( ( is_hostname( $host ) || is_ipv4( $host ) || is_ipv6( $host ) ) && $port ) {
      66        
1583 50         1558 return;
1584             }
1585              
1586 172         9788 return $server;
1587             }
1588              
1589             # Handler for errors
1590             sub _error {
1591 226     226   643 my $self = shift;
1592 226         654 Kafka::Exception::Connection->throw( throw_args( @_ ) );
1593             }
1594              
1595             1;
1596              
1597             __END__