Branch Coverage

lib/Kafka/Connection.pm
Criterion Covered Total %
branch 139 250 55.6


line true false branch
448 8 224 unless defined $self->{'host'} and $self->{'host'} eq '' || defined &_STRING($self->{'host'}) and not utf8::is_utf8($self->{'host'})
450 15 209 unless &_POSINT($self->{'port'})
452 0 199 unless not defined $self->{'timeout'} or defined &_NUMBER($self->{'timeout'}) and int 1000 * $self->{'timeout'} >= 1 and int $self->{'timeout'} * 1000 <= $Kafka::Connection::MAX_INT32
454 16 183 unless &_ARRAY0($self->{'broker_list'})
456 15 168 unless &_POSINT($self->{'SEND_MAX_ATTEMPTS'})
458 15 153 unless &_POSINT($self->{'RETRY_BACKOFF'})
460 0 153 unless defined &_NONNEGINT($self->{'MaxLoggedErrors'})
463 2 2 unless not defined $ip_version or defined &_NONNEGINT($ip_version) and $ip_version == $Kafka::Connection::IP_V4 || $ip_version == $Kafka::Connection::IP_V6
501 150 1 $self->{'host'} ? :
502 19 150 unless $self->_is_like_server($server)
514 1 131 unless keys %$IO_cache
545 0 10124 unless defined $server_metadata
550 10028 96 if defined $server_metadata->{'_api_versions'}
557 96 0 if $self->{'dont_load_supported_api_versions'}
572 0 0 if (defined $error)
573 0 0 if (&blessed($error) and $error->isa('Kafka::Exception')) { }
574 0 0 if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT)
595 0 0 if $version > $implemented_max_version
597 0 0 if $version < $kafka_min_version
617 0 0 if $self->debug_level
628 0 0 unless $self->_connectIO($broker)
630 0 0 unless my $sent = $self->_sendIO($broker, $encoded_request)
635 0 0 unless ($encoded_response_ref)
641 0 0 if $self->debug_level
645 0 0 unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
652 0 0 unless $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR
690 0 1 unless not defined $topic or $topic eq '' || defined &_STRING($topic) and not utf8::is_utf8($topic)
693 0 1 unless $self->_update_metadata($topic)
698 1 0 if (defined $topic) { }
717 38 5 unless $self->_is_like_server($server)
729 19 3 unless $self->_is_like_server($server)
732 0 3 unless $self->get_known_servers
737 1 2 unless exists $io_cache->{$server}
739 2 0 if (my $io = $self->_connectIO($server)) { }
750 19 14 unless $self->_is_like_server($server)
755 8 6 unless (exists $io_cache->{$server} and $io = $io_cache->{$server}{'IO'})
825 52 10046 if (not %{$self->{'_metadata'};} or not $self->{'AutoCreateTopicsEnable'} and defined $topic_name and not exists $self->{'_metadata'}{$topic_name})
829 2 49 unless $self->_update_metadata($topic_name)
835 0 10095 unless exists $request->{'CorrelationId'}
837 0 10095 if $self->debug_level
855 10146 0 if ($host_to_send_to eq 'leader') { }
0 0 elsif ($host_to_send_to eq 'group_coordinator') { }
858 0 10146 unless defined $leader
861 0 10146 unless ($server)
868 0 0 if (not %{$self->{'_group_coordinators'};} and defined $group_id)
873 0 0 unless ($server)
883 10142 4 if ($self->_connectIO($server)) { }
891 10124 18 unless (defined $request->{'ApiVersion'})
894 0 10124 unless ($self->_is_IO_connected($server))
902 2 10140 unless ($self->_sendIO($server, $encoded_request))
904 2 0 $io_error ? :
910 4 0 $io_error ? :
913 6 10140 if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR)
916 0 6 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and not $ErrorCode == $Kafka::Connection::ERROR_CANNOT_BIND || $ErrorCode == $Kafka::Connection::ERROR_NO_CONNECTION)
925 5009 5131 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $request->{'RequiredAcks'} == $Kafka::Connection::NOT_SEND_ANY_RESPONSE) { }
944 2 5129 unless ($encoded_response_ref)
945 1 1 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE) { }
960 5128 1 if (length $$encoded_response_ref > 4) { }
964 0 5128 if $self->debug_level
976 0 10137 unless $response->{'CorrelationId'} == $request->{'CorrelationId'}
978 21 10116 $api_key == $Kafka::Connection::APIKEY_OFFSET ? :
982 10053 84 if $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR
984 0 84 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $ErrorCode == $Kafka::Connection::ERROR_REQUEST_TIMED_OUT)
992 64 20 if (exists $RETRY_ON_ERRORS{$ErrorCode})
1004 0 0 $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR ? :
0 71 if $self->debug_level
1010 3 68 unless $self->_update_metadata($topic_name)
1014 0 68 if ($host_to_send_to eq 'group_coordinator')
1020 17 0 if ($ErrorCode) { }
1021 16 1 $partition_data ? :
1050 0 3 unless defined $topic and $topic eq '' || defined &_STRING($topic) and not utf8::is_utf8($topic)
1052 0 3 unless defined $partition and &isint($partition) and $partition >= 0
1055 0 3 unless (%{$self->{'_metadata'};})
1056 0 0 unless $self->_update_metadata($topic)
1072 0 3 unless ($self->is_server_known($server))
1110 3 1 if (my $error = $self->_io_error($server))
1157 0 372 if $host and $host =~ /^\[(.+)\]$/
1169 0 71 unless my $max_logged_errors = $self->{'MaxLoggedErrors'}
1172 0 71 if scalar @{$self->{'_nonfatal_errors'};} == $max_logged_errors
1175 0 0 defined $error_code && exists $Kafka::Connection::ERROR{$error_code} ? :
1182 0 71 if $self->debug_level
1199 118 109 if (defined $NodeId and $NodeId == $node_id)
1217 213 53 if (defined $server_data->{'NodeId'}) { }
1218 135 78 if ($server_data->{'IO'}) { }
1242 0 0 if $self->debug_level
1253 0 0 if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker)
1258 0 0 unless ($encoded_response_ref)
1264 0 0 if $self->debug_level
1268 0 0 unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
1272 0 0 if $decoded_response->{'ErrorCode'}
1299 0 124 if $self->debug_level
1310 118 11 if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker)
1315 5 118 unless ($encoded_response_ref)
1321 0 118 if $self->debug_level
1325 0 118 unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
1329 0 118 unless (&_ARRAY($decoded_response->{'Broker'}))
1330 0 0 if ($self->{'AutoCreateTopicsEnable'}) { }
1367 0 118 if ($ErrorCode = $topic_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR
1372 0 118 if ($ErrorCode = $partition_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR and $ErrorCode != $Kafka::Connection::ERROR_REPLICA_NOT_AVAILABLE
1384 0 118 if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR)
1385 0 0 if (exists $RETRY_ON_ERRORS{$ErrorCode}) { }
1389 0 0 defined $partition ? :
1404 0 0 if $is_recursive_call
1413 0 0 if $self->debug_level
1416 0 0 if $self->_update_metadata($topic, 1)
1419 0 0 defined $partition ? :
1428 0 654 if is_ipv6($host)
1437 16 4 if ($server_data->{'IO'})
1442 20 0 if (&blessed($error) and $error->isa('Kafka::Exception')) { }
1443 1 19 if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT or $error->code == $Kafka::Connection::ERROR_INCOMPATIBLE_HOST_IP_VERSION)
1458 19 0 if (my $server_data = $self->{'_IO_cache'}{$server})
1466 0 10124 unless my $server_data = $self->{'_IO_cache'}{$server}
1474 0 10278 unless my $server_data = $self->{'_IO_cache'}{$server}
1477 114 10164 unless ($server_data->{'IO'})
1491 4 110 if (defined $error)
1502 0 15524 unless my $server_data = $self->{'_IO_cache'}{$server}
1507 0 15524 unless $server_data->{'IO'}
1523 11 10261 if (defined $error)
1538 5247 0 if ($response_ref and length $$response_ref == 4)
1547 5 5247 if (defined $error)
1558 24 0 if (my $server_data = $self->{'_IO_cache'}{$server})
1559 13 11 if (my $io = $server_data->{'IO'})
1561 13 0 unless $keep_error
1573 40 222 unless (defined $server and defined &_STRING($server) and not utf8::is_utf8($server))
1582 50 172 unless (is_hostname($host) || is_ipv4($host) || is_ipv6($host) and $port)