line |
true |
false |
branch |
448
|
8 |
224 |
unless defined $self->{'host'} and $self->{'host'} eq '' || defined &_STRING($self->{'host'}) and not utf8::is_utf8($self->{'host'}) |
450
|
15 |
209 |
unless &_POSINT($self->{'port'}) |
452
|
0 |
199 |
unless not defined $self->{'timeout'} or defined &_NUMBER($self->{'timeout'}) and int 1000 * $self->{'timeout'} >= 1 and int $self->{'timeout'} * 1000 <= $Kafka::Connection::MAX_INT32 |
454
|
16 |
183 |
unless &_ARRAY0($self->{'broker_list'}) |
456
|
15 |
168 |
unless &_POSINT($self->{'SEND_MAX_ATTEMPTS'}) |
458
|
15 |
153 |
unless &_POSINT($self->{'RETRY_BACKOFF'}) |
460
|
0 |
153 |
unless defined &_NONNEGINT($self->{'MaxLoggedErrors'}) |
463
|
2 |
2 |
unless not defined $ip_version or defined &_NONNEGINT($ip_version) and $ip_version == $Kafka::Connection::IP_V4 || $ip_version == $Kafka::Connection::IP_V6 |
501
|
150 |
1 |
$self->{'host'} ? : |
502
|
19 |
150 |
unless $self->_is_like_server($server) |
514
|
1 |
131 |
unless keys %$IO_cache |
545
|
0 |
10124 |
unless defined $server_metadata |
550
|
10028 |
96 |
if defined $server_metadata->{'_api_versions'} |
557
|
96 |
0 |
if $self->{'dont_load_supported_api_versions'} |
572
|
0 |
0 |
if (defined $error) |
573
|
0 |
0 |
if (&blessed($error) and $error->isa('Kafka::Exception')) { } |
574
|
0 |
0 |
if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT) |
595
|
0 |
0 |
if $version > $implemented_max_version |
597
|
0 |
0 |
if $version < $kafka_min_version |
617
|
0 |
0 |
if $self->debug_level |
628
|
0 |
0 |
unless $self->_connectIO($broker) |
630
|
0 |
0 |
unless my $sent = $self->_sendIO($broker, $encoded_request) |
635
|
0 |
0 |
unless ($encoded_response_ref) |
641
|
0 |
0 |
if $self->debug_level |
645
|
0 |
0 |
unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId |
652
|
0 |
0 |
unless $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR |
690
|
0 |
1 |
unless not defined $topic or $topic eq '' || defined &_STRING($topic) and not utf8::is_utf8($topic) |
693
|
0 |
1 |
unless $self->_update_metadata($topic) |
698
|
1 |
0 |
if (defined $topic) { } |
717
|
38 |
5 |
unless $self->_is_like_server($server) |
729
|
19 |
3 |
unless $self->_is_like_server($server) |
732
|
0 |
3 |
unless $self->get_known_servers |
737
|
1 |
2 |
unless exists $io_cache->{$server} |
739
|
2 |
0 |
if (my $io = $self->_connectIO($server)) { } |
750
|
19 |
14 |
unless $self->_is_like_server($server) |
755
|
8 |
6 |
unless (exists $io_cache->{$server} and $io = $io_cache->{$server}{'IO'}) |
821
|
52 |
10046 |
if (not %{$self->{'_metadata'};} or not $self->{'AutoCreateTopicsEnable'} and defined $topic_name and not exists $self->{'_metadata'}{$topic_name}) |
825
|
2 |
49 |
unless $self->_update_metadata($topic_name) |
831
|
0 |
10095 |
unless exists $request->{'CorrelationId'} |
833
|
0 |
10095 |
if $self->debug_level |
851
|
10146 |
0 |
if ($host_to_send_to eq 'leader') { } |
|
0 |
0 |
elsif ($host_to_send_to eq 'group_coordinator') { } |
854
|
0 |
10146 |
unless defined $leader |
857
|
0 |
10146 |
unless ($server) |
864
|
0 |
0 |
if (not %{$self->{'_group_coordinators'};} and defined $group_id) |
869
|
0 |
0 |
unless ($server) |
879
|
10142 |
4 |
if ($self->_connectIO($server)) { } |
887
|
10124 |
18 |
unless (defined $request->{'ApiVersion'}) |
890
|
0 |
10124 |
unless ($self->_is_IO_connected($server)) |
898
|
2 |
10140 |
unless ($self->_sendIO($server, $encoded_request)) |
900
|
2 |
0 |
$io_error ? : |
906
|
4 |
0 |
$io_error ? : |
909
|
6 |
10140 |
if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR) |
912
|
0 |
6 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and not $ErrorCode == $Kafka::Connection::ERROR_CANNOT_BIND || $ErrorCode == $Kafka::Connection::ERROR_NO_CONNECTION) |
921
|
5009 |
5131 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $request->{'RequiredAcks'} == $Kafka::Connection::NOT_SEND_ANY_RESPONSE) { } |
940
|
2 |
5129 |
unless ($encoded_response_ref) |
941
|
1 |
1 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE) { } |
956
|
5128 |
1 |
if (length $$encoded_response_ref > 4) { } |
960
|
0 |
5128 |
if $self->debug_level |
972
|
0 |
10137 |
unless $response->{'CorrelationId'} == $request->{'CorrelationId'} |
974
|
21 |
10116 |
$api_key == $Kafka::Connection::APIKEY_OFFSET ? : |
978
|
10053 |
84 |
if $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR |
980
|
0 |
84 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $ErrorCode == $Kafka::Connection::ERROR_REQUEST_TIMED_OUT) |
988
|
64 |
20 |
if (exists $RETRY_ON_ERRORS{$ErrorCode}) |
1000
|
0 |
0 |
$ErrorCode == $Kafka::Connection::ERROR_NO_ERROR ? : |
|
0 |
71 |
if $self->debug_level |
1006
|
3 |
68 |
unless $self->_update_metadata($topic_name) |
1010
|
0 |
68 |
if ($host_to_send_to eq 'group_coordinator') |
1016
|
17 |
0 |
if ($ErrorCode) { } |
1017
|
16 |
1 |
$partition_data ? : |
1046
|
0 |
3 |
unless defined $topic and $topic eq '' || defined &_STRING($topic) and not utf8::is_utf8($topic) |
1048
|
0 |
3 |
unless defined $partition and &isint($partition) and $partition >= 0 |
1051
|
0 |
3 |
unless (%{$self->{'_metadata'};}) |
1052
|
0 |
0 |
unless $self->_update_metadata($topic) |
1068
|
0 |
3 |
unless ($self->is_server_known($server)) |
1106
|
3 |
1 |
if (my $error = $self->_io_error($server)) |
1153
|
0 |
372 |
if $host and $host =~ /^\[(.+)\]$/ |
1165
|
0 |
71 |
unless my $max_logged_errors = $self->{'MaxLoggedErrors'} |
1168
|
0 |
71 |
if scalar @{$self->{'_nonfatal_errors'};} == $max_logged_errors |
1171
|
0 |
0 |
defined $error_code && exists $Kafka::Connection::ERROR{$error_code} ? : |
1178
|
0 |
71 |
if $self->debug_level |
1195
|
118 |
132 |
if (defined $NodeId and $NodeId == $node_id) |
1213
|
213 |
53 |
if (defined $server_data->{'NodeId'}) { } |
1214
|
135 |
78 |
if ($server_data->{'IO'}) { } |
1238
|
0 |
0 |
if $self->debug_level |
1249
|
0 |
0 |
if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker) |
1254
|
0 |
0 |
unless ($encoded_response_ref) |
1260
|
0 |
0 |
if $self->debug_level |
1264
|
0 |
0 |
unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId |
1268
|
0 |
0 |
if $decoded_response->{'ErrorCode'} |
1295
|
0 |
124 |
if $self->debug_level |
1306
|
118 |
11 |
if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker) |
1311
|
5 |
118 |
unless ($encoded_response_ref) |
1317
|
0 |
118 |
if $self->debug_level |
1321
|
0 |
118 |
unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId |
1325
|
0 |
118 |
unless (&_ARRAY($decoded_response->{'Broker'})) |
1326
|
0 |
0 |
if ($self->{'AutoCreateTopicsEnable'}) { } |
1363
|
0 |
118 |
if ($ErrorCode = $topic_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR |
1368
|
0 |
118 |
if ($ErrorCode = $partition_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR and $ErrorCode != $Kafka::Connection::ERROR_REPLICA_NOT_AVAILABLE |
1380
|
0 |
118 |
if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR) |
1381
|
0 |
0 |
if (exists $RETRY_ON_ERRORS{$ErrorCode}) { } |
1385
|
0 |
0 |
defined $partition ? : |
1400
|
0 |
0 |
if $is_recursive_call |
1409
|
0 |
0 |
if $self->debug_level |
1412
|
0 |
0 |
if $self->_update_metadata($topic, 1) |
1415
|
0 |
0 |
defined $partition ? : |
1424
|
0 |
654 |
if is_ipv6($host) |
1433
|
16 |
4 |
if ($server_data->{'IO'}) |
1438
|
20 |
0 |
if (&blessed($error) and $error->isa('Kafka::Exception')) { } |
1439
|
1 |
19 |
if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT or $error->code == $Kafka::Connection::ERROR_INCOMPATIBLE_HOST_IP_VERSION) |
1454
|
19 |
0 |
if (my $server_data = $self->{'_IO_cache'}{$server}) |
1462
|
0 |
10124 |
unless my $server_data = $self->{'_IO_cache'}{$server} |
1470
|
0 |
10278 |
unless my $server_data = $self->{'_IO_cache'}{$server} |
1473
|
114 |
10164 |
unless ($server_data->{'IO'}) |
1487
|
4 |
110 |
if (defined $error) |
1498
|
0 |
15524 |
unless my $server_data = $self->{'_IO_cache'}{$server} |
1503
|
0 |
15524 |
unless $server_data->{'IO'} |
1519
|
11 |
10261 |
if (defined $error) |
1534
|
5247 |
0 |
if ($response_ref and length $$response_ref == 4) |
1543
|
5 |
5247 |
if (defined $error) |
1554
|
24 |
0 |
if (my $server_data = $self->{'_IO_cache'}{$server}) |
1555
|
13 |
11 |
if (my $io = $server_data->{'IO'}) |
1557
|
13 |
0 |
unless $keep_error |
1569
|
40 |
222 |
unless (defined $server and defined &_STRING($server) and not utf8::is_utf8($server)) |
1578
|
50 |
172 |
unless (is_hostname($host) || is_ipv4($host) || is_ipv6($host) and $port) |