Branch Coverage

lib/Kafka/Connection.pm
Criterion Covered Total %
branch 139 250 55.6


line true false branch
448 8 224 unless defined $self->{'host'} and $self->{'host'} eq '' || defined &_STRING($self->{'host'}) and not utf8::is_utf8($self->{'host'})
450 15 209 unless &_POSINT($self->{'port'})
452 0 199 unless not defined $self->{'timeout'} or defined &_NUMBER($self->{'timeout'}) and int 1000 * $self->{'timeout'} >= 1 and int $self->{'timeout'} * 1000 <= $Kafka::Connection::MAX_INT32
454 16 183 unless &_ARRAY0($self->{'broker_list'})
456 15 168 unless &_POSINT($self->{'SEND_MAX_ATTEMPTS'})
458 15 153 unless &_POSINT($self->{'RETRY_BACKOFF'})
460 0 153 unless defined &_NONNEGINT($self->{'MaxLoggedErrors'})
463 2 2 unless not defined $ip_version or defined &_NONNEGINT($ip_version) and $ip_version == $Kafka::Connection::IP_V4 || $ip_version == $Kafka::Connection::IP_V6
501 150 1 $self->{'host'} ? :
502 19 150 unless $self->_is_like_server($server)
514 1 131 unless keys %$IO_cache
545 0 10124 unless defined $server_metadata
550 10028 96 if defined $server_metadata->{'_api_versions'}
557 96 0 if $self->{'dont_load_supported_api_versions'}
572 0 0 if (defined $error)
573 0 0 if (&blessed($error) and $error->isa('Kafka::Exception')) { }
574 0 0 if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT)
595 0 0 if $version > $implemented_max_version
597 0 0 if $version < $kafka_min_version
617 0 0 if $self->debug_level
628 0 0 unless $self->_connectIO($broker)
630 0 0 unless my $sent = $self->_sendIO($broker, $encoded_request)
635 0 0 unless ($encoded_response_ref)
641 0 0 if $self->debug_level
645 0 0 unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
652 0 0 unless $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR
690 0 1 unless not defined $topic or $topic eq '' || defined &_STRING($topic) and not utf8::is_utf8($topic)
693 0 1 unless $self->_update_metadata($topic)
698 1 0 if (defined $topic) { }
717 38 5 unless $self->_is_like_server($server)
729 19 3 unless $self->_is_like_server($server)
732 0 3 unless $self->get_known_servers
737 1 2 unless exists $io_cache->{$server}
739 2 0 if (my $io = $self->_connectIO($server)) { }
750 19 14 unless $self->_is_like_server($server)
755 8 6 unless (exists $io_cache->{$server} and $io = $io_cache->{$server}{'IO'})
821 52 10046 if (not %{$self->{'_metadata'};} or not $self->{'AutoCreateTopicsEnable'} and defined $topic_name and not exists $self->{'_metadata'}{$topic_name})
825 2 49 unless $self->_update_metadata($topic_name)
831 0 10095 unless exists $request->{'CorrelationId'}
833 0 10095 if $self->debug_level
851 10146 0 if ($host_to_send_to eq 'leader') { }
0 0 elsif ($host_to_send_to eq 'group_coordinator') { }
854 0 10146 unless defined $leader
857 0 10146 unless ($server)
864 0 0 if (not %{$self->{'_group_coordinators'};} and defined $group_id)
869 0 0 unless ($server)
879 10142 4 if ($self->_connectIO($server)) { }
887 10124 18 unless (defined $request->{'ApiVersion'})
890 0 10124 unless ($self->_is_IO_connected($server))
898 2 10140 unless ($self->_sendIO($server, $encoded_request))
900 2 0 $io_error ? :
906 4 0 $io_error ? :
909 6 10140 if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR)
912 0 6 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and not $ErrorCode == $Kafka::Connection::ERROR_CANNOT_BIND || $ErrorCode == $Kafka::Connection::ERROR_NO_CONNECTION)
921 5009 5131 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $request->{'RequiredAcks'} == $Kafka::Connection::NOT_SEND_ANY_RESPONSE) { }
940 2 5129 unless ($encoded_response_ref)
941 1 1 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE) { }
956 5128 1 if (length $$encoded_response_ref > 4) { }
960 0 5128 if $self->debug_level
972 0 10137 unless $response->{'CorrelationId'} == $request->{'CorrelationId'}
974 21 10116 $api_key == $Kafka::Connection::APIKEY_OFFSET ? :
978 10053 84 if $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR
980 0 84 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $ErrorCode == $Kafka::Connection::ERROR_REQUEST_TIMED_OUT)
988 64 20 if (exists $RETRY_ON_ERRORS{$ErrorCode})
1000 0 0 $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR ? :
0 71 if $self->debug_level
1006 3 68 unless $self->_update_metadata($topic_name)
1010 0 68 if ($host_to_send_to eq 'group_coordinator')
1016 17 0 if ($ErrorCode) { }
1017 16 1 $partition_data ? :
1046 0 3 unless defined $topic and $topic eq '' || defined &_STRING($topic) and not utf8::is_utf8($topic)
1048 0 3 unless defined $partition and &isint($partition) and $partition >= 0
1051 0 3 unless (%{$self->{'_metadata'};})
1052 0 0 unless $self->_update_metadata($topic)
1068 0 3 unless ($self->is_server_known($server))
1106 3 1 if (my $error = $self->_io_error($server))
1153 0 372 if $host and $host =~ /^\[(.+)\]$/
1165 0 71 unless my $max_logged_errors = $self->{'MaxLoggedErrors'}
1168 0 71 if scalar @{$self->{'_nonfatal_errors'};} == $max_logged_errors
1171 0 0 defined $error_code && exists $Kafka::Connection::ERROR{$error_code} ? :
1178 0 71 if $self->debug_level
1195 118 132 if (defined $NodeId and $NodeId == $node_id)
1213 213 53 if (defined $server_data->{'NodeId'}) { }
1214 135 78 if ($server_data->{'IO'}) { }
1238 0 0 if $self->debug_level
1249 0 0 if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker)
1254 0 0 unless ($encoded_response_ref)
1260 0 0 if $self->debug_level
1264 0 0 unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
1268 0 0 if $decoded_response->{'ErrorCode'}
1295 0 124 if $self->debug_level
1306 118 11 if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker)
1311 5 118 unless ($encoded_response_ref)
1317 0 118 if $self->debug_level
1321 0 118 unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
1325 0 118 unless (&_ARRAY($decoded_response->{'Broker'}))
1326 0 0 if ($self->{'AutoCreateTopicsEnable'}) { }
1363 0 118 if ($ErrorCode = $topic_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR
1368 0 118 if ($ErrorCode = $partition_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR and $ErrorCode != $Kafka::Connection::ERROR_REPLICA_NOT_AVAILABLE
1380 0 118 if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR)
1381 0 0 if (exists $RETRY_ON_ERRORS{$ErrorCode}) { }
1385 0 0 defined $partition ? :
1400 0 0 if $is_recursive_call
1409 0 0 if $self->debug_level
1412 0 0 if $self->_update_metadata($topic, 1)
1415 0 0 defined $partition ? :
1424 0 654 if is_ipv6($host)
1433 16 4 if ($server_data->{'IO'})
1438 20 0 if (&blessed($error) and $error->isa('Kafka::Exception')) { }
1439 1 19 if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT or $error->code == $Kafka::Connection::ERROR_INCOMPATIBLE_HOST_IP_VERSION)
1454 19 0 if (my $server_data = $self->{'_IO_cache'}{$server})
1462 0 10124 unless my $server_data = $self->{'_IO_cache'}{$server}
1470 0 10278 unless my $server_data = $self->{'_IO_cache'}{$server}
1473 114 10164 unless ($server_data->{'IO'})
1487 4 110 if (defined $error)
1498 0 15524 unless my $server_data = $self->{'_IO_cache'}{$server}
1503 0 15524 unless $server_data->{'IO'}
1519 11 10261 if (defined $error)
1534 5247 0 if ($response_ref and length $$response_ref == 4)
1543 5 5247 if (defined $error)
1554 24 0 if (my $server_data = $self->{'_IO_cache'}{$server})
1555 13 11 if (my $io = $server_data->{'IO'})
1557 13 0 unless $keep_error
1569 40 222 unless (defined $server and defined &_STRING($server) and not utf8::is_utf8($server))
1578 50 172 unless (is_hostname($host) || is_ipv4($host) || is_ipv6($host) and $port)