File Coverage

lib/Kafka/IO.pm
Criterion Covered Total %
statement 261 332 78.6
branch 80 170 47.0
condition 47 128 36.7
subroutine 34 37 91.8
pod 4 4 100.0
total 426 671 63.4


line stmt bran cond sub pod time code
1             package Kafka::IO;
2              
3             =head1 NAME
4              
5             Kafka::IO - Interface to network communication with the Apache Kafka server.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.07 .
10              
11             =cut
12              
13              
14              
15 15     15   121107 use 5.010;
  15         42  
16 15     15   70 use strict;
  15         19  
  15         282  
17 15     15   56 use warnings;
  15         33  
  15         747  
18              
19              
20              
21             our $DEBUG = 0;
22              
23             our $VERSION = '1.07';
24              
25              
26              
27 15     15   74 use Carp;
  15         20  
  15         657  
28 15     15   65 use Config;
  15         41  
  15         523  
29 15     15   58 use Const::Fast;
  15         29  
  15         85  
30 15         720 use Data::Validate::Domain qw(
31             is_hostname
32 15     15   3783 );
  15         104595  
33 15         1098 use Data::Validate::IP qw(
34             is_ipv4
35             is_ipv6
36 15     15   4025 );
  15         271717  
37 15         1077 use Errno qw(
38             EAGAIN
39             ECONNRESET
40             EINTR
41             EWOULDBLOCK
42             ETIMEDOUT
43 15     15   1774 );
  15         6291  
44 15     15   81 use Fcntl;
  15         24  
  15         2939  
45 15     15   4504 use IO::Select;
  15         18212  
  15         579  
46 15         624 use Params::Util qw(
47             _STRING
48 15     15   1064 );
  15         6078  
49 15         147 use POSIX qw(
50             ceil
51 15     15   73 );
  15         23  
52 15         742 use Scalar::Util qw(
53             dualvar
54 15     15   8347 );
  15         22  
55 15         1439 use Socket qw(
56             AF_INET
57             AF_INET6
58             IPPROTO_TCP
59             MSG_DONTWAIT
60             MSG_PEEK
61             NI_NUMERICHOST
62             NIx_NOSERV
63             PF_INET
64             PF_INET6
65             SOCK_STREAM
66             SOL_SOCKET
67             SO_ERROR
68             SO_RCVTIMEO
69             SO_SNDTIMEO
70             getaddrinfo
71             getnameinfo
72             inet_aton
73             inet_pton
74             inet_ntop
75             pack_sockaddr_in
76             pack_sockaddr_in6
77 15     15   104 );
  15         25  
78 15         655 use Sys::SigAction qw(
79             set_sig_handler
80 15     15   3875 );
  15         34952  
81 15     15   89 use Time::HiRes ();
  15         27  
  15         215  
82 15     15   957 use Try::Tiny;
  15         4808  
  15         776  
83              
84 15         1849 use Kafka qw(
85             $ERROR_CANNOT_BIND
86             $ERROR_CANNOT_RECV
87             $ERROR_CANNOT_SEND
88             $ERROR_MISMATCH_ARGUMENT
89             $ERROR_INCOMPATIBLE_HOST_IP_VERSION
90             $ERROR_NO_CONNECTION
91             $IP_V4
92             $IP_V6
93             $KAFKA_SERVER_PORT
94             $REQUEST_TIMEOUT
95 15     15   76 );
  15         27  
96 15     15   2569 use Kafka::Exceptions;
  15         35  
  15         626  
97 15         33374 use Kafka::Internals qw(
98             $MAX_SOCKET_REQUEST_BYTES
99             debug_level
100             format_message
101 15     15   77 );
  15         25  
102              
103              
104              
105             =head1 SYNOPSIS
106              
107             use 5.010;
108             use strict;
109             use warnings;
110              
111             use Scalar::Util qw(
112             blessed
113             );
114             use Try::Tiny;
115              
116             use Kafka::IO;
117              
118             my $io;
119             try {
120             $io = Kafka::IO->new( host => 'localhost' );
121             } catch {
122             my $error = $_;
123             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
124             warn 'Error: (', $error->code, ') ', $error->message, "\n";
125             exit;
126             } else {
127             die $error;
128             }
129             };
130              
131             # Closes and cleans up
132             $io->close;
133             undef $io;
134              
135             =head1 DESCRIPTION
136              
137             This module is private and should not be used directly.
138              
139             In order to achieve better performance, methods of this module do not
140             perform arguments validation.
141              
142             The main features of the C class are:
143              
144             =over 3
145              
146             =item *
147              
148             Provides an object oriented API for communication with Kafka.
149              
150             =item *
151              
152             This class allows you to create Kafka 0.9+ clients.
153              
154             =back
155              
156             =cut
157              
158             # Hard limit of IO operation retry attempts, to prevent high CPU usage in IO retry loop
159             const my $MAX_RETRIES => 30;
160              
161             our $_hdr;
162              
163             #-- constructor ----------------------------------------------------------------
164              
165             =head2 CONSTRUCTOR
166              
167             =head3 C
168              
169             Establishes TCP connection to given host and port, creates and returns C IO object.
170              
171             C takes arguments in key-value pairs. The following arguments are currently recognized:
172              
173             =over 3
174              
175             =item C $host>
176              
177             C<$host> is Kafka host to connect to. It can be a host name or an IP-address in
178             IPv4 or IPv6 form (for example '127.0.0.1', '0:0:0:0:0:0:0:1' or '::1').
179              
180             =item C $port>
181              
182             Optional, default = C<$KAFKA_SERVER_PORT>.
183              
184             C<$port> is integer attribute denoting the port number of to access Apache Kafka.
185              
186             C<$KAFKA_SERVER_PORT> is the default Apache Kafka server port that can be imported
187             from the L module.
188              
189             =item C $timeout>
190              
191             C<$REQUEST_TIMEOUT> is the default timeout that can be imported from the L module.
192              
193             Special behavior when C is set to C:
194              
195             =back
196              
197             =over 3
198              
199             =item *
200              
201             Alarms are not used internally (namely when performing C).
202              
203             =item *
204              
205             Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.
206              
207             =back
208              
209             =over 3
210              
211             =item C $ip_version>
212              
213             Force version of IP protocol for resolving host name (or interpretation of passed address).
214              
215             Optional, undefined by default, which works in the following way: version of IP address
216             is detected automatically, host name is resolved into IPv4 address.
217              
218             See description of L<$IP_V4|Kafka::IO/$IP_V4>, L<$IP_V6|Kafka::IO/$IP_V6>
219             in C L.
220              
221             =back
222              
223             =cut
224             sub new {
225 13     13 1 14007 my ( $class, %p ) = @_;
226              
227 13         123 my $self = bless {
228             host => '',
229             timeout => $REQUEST_TIMEOUT,
230             port => $KAFKA_SERVER_PORT,
231             ip_version => undef,
232             af => '', # Address family constant
233             pf => '', # Protocol family constant
234             ip => '', # Human-readable textual representation of the ip address
235             }, $class;
236              
237 13   66     188 exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
238              
239             # we trust it: make it untainted
240 13         113 ( $self->{host} ) = $self->{host} =~ /\A(.+)\z/;
241 13         66 ( $self->{port} ) = $self->{port} =~ /\A(.+)\z/;
242              
243 13         54 $self->{socket} = undef;
244 13         34 $self->{_io_select} = undef;
245 13         28 my $error;
246             try {
247 13     13   1428 $self->_connect();
248             } catch {
249 4     4   1908 $error = $_;
250 13         144 };
251              
252 13 100       258 $self->_error( $ERROR_CANNOT_BIND, format_message("Kafka::IO(%s:%s)->new: %s", $self->{host}, $self->{port}, $error ) )
253             if defined $error
254             ;
255 9         478 return $self;
256             }
257              
258             #-- public attributes ----------------------------------------------------------
259              
260             =head2 METHODS
261              
262             The following methods are provided by C class:
263              
264             =cut
265              
266             =head3 C<< send( $message <, $timeout> ) >>
267              
268             Sends a C<$message> to Kafka.
269              
270             The argument must be a bytes string.
271              
272             Use optional C<$timeout> argument to override default timeout for this request only.
273              
274             Returns the number of characters sent.
275              
276             =cut
277             sub send {
278 1     1 1 66 my ( $self, $message, $timeout ) = @_;
279 1 50       7 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
280             unless defined( _STRING( $message ) )
281             ;
282 1         3 my $length = length( $message );
283 1 50       5 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
284             unless $length <= $MAX_SOCKET_REQUEST_BYTES
285             ;
286 1 50 33     8 $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
287 1 50       5 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
288             unless $timeout > 0
289             ;
290 1         3 my $select = $self->{_io_select};
291 1 50       4 $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select;
292              
293 1 50       6 $self->_debug_msg( $message, 'Request to', 'green' )
294             if $self->debug_level >= 2
295             ;
296 1         3 my $sent = 0;
297              
298 1         7 my $started = Time::HiRes::time();
299 1         3 my $until = $started + $timeout;
300              
301 1         2 my $error_code;
302             my $errno;
303 1         3 my $retries = 0;
304 1         3 my $interrupts = 0;
305 1   66     21 ATTEMPT: while ( $sent < $length && $retries++ < $MAX_RETRIES ) {
306 1         6 my $remaining_time = $until - Time::HiRes::time();
307 1 50       5 last ATTEMPT if $remaining_time <= 0; # timeout expired
308              
309 1         4 undef $!;
310 1         9 my $can_write = $select->can_write( $remaining_time );
311 1         64 $errno = $!;
312 1 50       83 if ( $errno ) {
313 0 0       0 if ( $errno == EINTR ) {
314 0         0 undef $errno;
315 0         0 --$retries; # this attempt does not count
316 0         0 ++$interrupts;
317 0         0 next ATTEMPT;
318             }
319              
320 0         0 $self->close;
321              
322 0         0 last ATTEMPT;
323             }
324              
325 1 50       4 if ( $can_write ) {
326             # check for EOF on the first attempt only
327 1 50 33     8 if ( $retries == 1 && $self->_is_close_wait ) {
328 0         0 $self->close;
329 0         0 $error_code = $ERROR_NO_CONNECTION;
330 0         0 last ATTEMPT;
331             }
332              
333 1         24 undef $!;
334 1         89 my $wrote = CORE::send( $self->{socket}, $message, MSG_DONTWAIT );
335 1         6 $errno = $!;
336              
337 1 50 33     13 if( defined $wrote && $wrote > 0 ) {
338 1         3 $sent += $wrote;
339 1 50       4 if ( $sent < $length ) {
340             # remove written data from message
341 0         0 $message = substr( $message, $wrote );
342             }
343             }
344              
345 1 50       3 if( $errno ) {
346 0 0 0     0 if( $errno == EINTR ) {
    0 0        
      0        
347 0         0 undef $errno;
348 0         0 --$retries; # this attempt does not count
349 0         0 ++$interrupts;
350 0         0 next ATTEMPT;
351             } elsif (
352             $errno != EAGAIN
353             && $errno != EWOULDBLOCK
354             ## on freebsd, if we got ECONNRESET, it's a timeout from the other side
355             && !( $errno == ECONNRESET && $^O eq 'freebsd' )
356             ) {
357 0         0 $self->close;
358 0         0 last ATTEMPT;
359             }
360             }
361              
362 1 50       6 last ATTEMPT unless defined $wrote;
363             }
364             }
365              
366 1 50 33     16 unless( !$errno && defined( $sent ) && $sent == $length )
      33        
367             {
368             $self->_error(
369             $error_code // $ERROR_CANNOT_SEND,
370             format_message( "Kafka::IO(%s)->send: ERRNO=%s ERROR='%s' (length=%s, sent=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)",
371             $self->{host},
372 0   0     0 ( $errno // 0 ) + 0,
      0        
      0        
373             ( $errno // '' ) . '',
374             $length,
375             $sent,
376             $timeout,
377             $retries,
378             $interrupts,
379             Time::HiRes::time() - $started,
380             )
381             );
382             }
383              
384 1         8 return $sent;
385             }
386              
387             =head3 C<< receive( $length <, $timeout> ) >>
388              
389             Receives a message up to C<$length> size from Kafka.
390              
391             C<$length> argument must be a positive number.
392              
393             Use optional C<$timeout> argument to override default timeout for this call only.
394              
395             Returns a reference to the received message.
396              
397             =cut
398             sub receive {
399 1     1 1 1484 my ( $self, $length, $timeout ) = @_;
400 1 50       5 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
401             unless $length > 0
402             ;
403 1 50 33     7 $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
404 1 50       6 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
405             unless $timeout > 0
406             ;
407 1         3 my $select = $self->{_io_select};
408 1 50       4 $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select;
409              
410 1         9 my $message = '';
411 1         2 my $len_to_read = $length;
412              
413 1         5 my $started = Time::HiRes::time();
414 1         3 my $until = $started + $timeout;
415              
416 1         3 my $error_code;
417             my $errno;
418 1         2 my $retries = 0;
419 1         3 my $interrupts = 0;
420 1   66     12 ATTEMPT: while ( $len_to_read > 0 && $retries++ < $MAX_RETRIES ) {
421 1         7 my $remaining_time = $until - Time::HiRes::time();
422 1 50       5 last if $remaining_time <= 0; # timeout expired
423              
424 1         2 undef $!;
425 1         6 my $can_read = $select->can_read( $remaining_time );
426 1         43 $errno = $!;
427 1 50       4 if ( $errno ) {
428 0 0       0 if ( $errno == EINTR ) {
429 0         0 undef $errno;
430 0         0 --$retries; # this attempt does not count
431 0         0 ++$interrupts;
432 0         0 next ATTEMPT;
433             }
434              
435 0         0 $self->close;
436              
437 0         0 last ATTEMPT;
438             }
439              
440 1 50       3 if ( $can_read ) {
441 1         7 my $buf = '';
442 1         3 undef $!;
443 1         14 my $from_recv = CORE::recv( $self->{socket}, $buf, $len_to_read, MSG_DONTWAIT );
444 1         4 $errno = $!;
445              
446 1 50 33     13 if ( defined( $from_recv ) && length( $buf ) ) {
447 1         3 $message .= $buf;
448 1         2 $len_to_read = $length - length( $message );
449 1         3 --$retries; # this attempt was successful, don't count as a retry
450             }
451 1 50       4 if ( $errno ) {
452 0 0 0     0 if ( $errno == EINTR ) {
    0 0        
      0        
453 0         0 undef $errno;
454 0         0 --$retries; # this attempt does not count
455 0         0 ++$interrupts;
456 0         0 next ATTEMPT;
457             } elsif (
458             $errno != EAGAIN
459             && $errno != EWOULDBLOCK
460             ## on freebsd, if we got ECONNRESET, it's a timeout from the other side
461             && !( $errno == ECONNRESET && $^O eq 'freebsd' )
462             ) {
463 0         0 $self->close;
464 0         0 last ATTEMPT;
465             }
466             }
467              
468 1 50       5 if ( length( $buf ) == 0 ) {
469 0 0 0     0 if( defined( $from_recv ) && ! $errno ) {
470             # no error and nothing received with select returning "can read" means EOF: other side closed socket
471 0 0       0 $self->_debug_msg( 'EOF on receive attempt, closing socket' )
472             if $self->debug_level;
473 0         0 $self->close;
474              
475 0 0       0 if( length( $message ) == 0 ) {
476             # we did not receive anything yet, so we may (in some cases) reconnect and try again
477 0         0 $error_code = $ERROR_NO_CONNECTION;
478             }
479              
480 0         0 last ATTEMPT;
481             }
482             # we did not read anything on this attempt: wait a bit before the next one; should not happen, but just in case...
483 0 0       0 if ( my $remaining_attempts = $MAX_RETRIES - $retries ) {
484 0         0 $remaining_time = $until - Time::HiRes::time();
485 0         0 my $micro_seconds = int( $remaining_time * 1e6 / $remaining_attempts );
486 0 0       0 if ( $micro_seconds > 0 ) {
487 0 0       0 $micro_seconds = 250_000 if $micro_seconds > 250_000; # prevent long sleeps if total remaining time is big
488 0 0       0 $self->_debug_msg( format_message( 'sleeping (remaining attempts %d, time %.6f): %d microseconds', $remaining_attempts, $remaining_time, $micro_seconds ) )
489             if $self->debug_level;
490 0         0 Time::HiRes::usleep( $micro_seconds );
491             }
492             }
493             }
494             }
495             }
496              
497 1 50 33     12 unless( !$errno && length( $message ) >= $length )
498             {
499             $self->_error(
500             $error_code // $ERROR_CANNOT_RECV,
501             format_message( "Kafka::IO(%s)->receive: ERRNO=%s ERROR='%s' (length=%s, received=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)",
502             $self->{host},
503 0   0     0 ( $errno // 0 ) + 0,
      0        
      0        
504             ( $errno // '' ) . '',
505             $length,
506             length( $message ),
507             $timeout,
508             $retries,
509             $interrupts,
510             Time::HiRes::time() - $started,
511             ),
512             );
513             }
514 1 50       5 $self->_debug_msg( $message, 'Response from', 'yellow' )
515             if $self->debug_level >= 2;
516              
517             # returns tainted data
518 1         7 return \$message;
519             }
520              
521             =head3 C
522              
523             Closes connection to Kafka server.
524             Returns true if those operations succeed and if no error was reported by any PerlIO layer.
525              
526             =cut
527             sub close {
528 1     1 1 3 my ( $self ) = @_;
529              
530 1         3 my $ret = 1;
531 1 50       6 if ( $self->{socket} ) {
532 1         49 $ret = CORE::close( $self->{socket} );
533 1         5 $self->{socket} = undef;
534 1         7 $self->{_io_select} = undef;
535             }
536              
537 1         5 return $ret;
538             }
539              
540             sub _is_close_wait {
541 1     1   3 my ( $self ) = @_;
542 1 50 33     13 return 1 unless $self->{socket} && $self->{_io_select}; # closed already
543             # http://stefan.buettcher.org/cs/conn_closed.html
544             # socket is open; check if we can read, and if we can but recv() cannot peek, it means we got EOF
545 1 50       6 return unless $self->{_io_select}->can_read( 0 ); # we cannot read, but may be able to write
546 0         0 my $buf = '';
547 0         0 undef $!;
548 0         0 my $status = CORE::recv( $self->{socket}, $buf, 1, MSG_DONTWAIT | MSG_PEEK ); # peek, do not remove data from queue
549             # EOF when there is no error, status is defined, but result is empty
550 0   0     0 return ! $! && defined $status && length( $buf ) == 0;
551             }
552              
553             # The method verifies if we can connect to a Kafka broker.
554             # This is evil: opens and immediately closes a NEW connection so do not use unless there is a strong reason for it.
555             sub _is_alive {
556 3     3   493674 my ( $self ) = @_;
557              
558 3         12 my $socket = $self->{socket};
559 3 100       16 return unless $socket;
560              
561 2         97 socket( my $tmp_socket, $self->{pf}, SOCK_STREAM, IPPROTO_TCP );
562 2         206 my $is_alive = connect( $tmp_socket, getpeername( $socket ) );
563 2         42 CORE::close( $tmp_socket );
564              
565 2         24 return $is_alive;
566             }
567              
568             #-- private attributes ---------------------------------------------------------
569              
570             #-- private methods ------------------------------------------------------------
571              
572             # You need to have access to Kafka instance and be able to connect through TCP.
573             # uses http://devpit.org/wiki/Connect%28%29_with_timeout_%28in_Perl%29
574             sub _connect {
575 13     13   29 my ( $self ) = @_;
576              
577 13         34 $self->{socket} = undef;
578 13         39 $self->{_io_select} = undef;
579              
580 13         23 my $name = $self->{host};
581 13         31 my $port = $self->{port};
582 13         27 my $timeout = $self->{timeout};
583              
584 13         26 my $ip = '';
585 13 100       46 if ( $self->_get_family( $name ) ) {
586 2         6 $ip = $self->{ip} = $name;
587             } else {
588 10 50       22 if ( defined $timeout ) {
589 10         17 my $remaining;
590 10         18 my $start = time();
591              
592 10 50       39 $self->_debug_msg( format_message( "name = '%s', number of wallclock seconds = %s", $name, ceil( $timeout ) ) )
593             if $self->debug_level;
594              
595             # DNS lookup.
596 10         26 local $@;
597 0     0   0 my $h = set_sig_handler( 'ALRM', sub { die 'alarm clock restarted' },
598             {
599 10         94 mask => [ 'ALRM' ],
600             safe => 0, # perl 5.8+ uses safe signal delivery so we need unsafe signal for timeout to work
601             }
602             );
603 10         1453 eval {
604 10         73 $remaining = alarm( ceil( $timeout ) );
605 10         39 $ip = $self->_gethostbyname( $name );
606 10         15000252 alarm 0;
607             };
608 10         26 alarm 0; # race condition protection
609 10         33 my $error = $@;
610 10         69 undef $h;
611              
612 10 50       643 $self->_debug_msg( format_message( "_connect: ip = '%s', error = '%s', \$? = %s, \$! = '%s'", $ip, $error, $?, $! ) )
613             if $self->debug_level;
614              
615 10 50       35 die $error if $error;
616 10 100       32 die( format_message( "gethostbyname %s: \$? = '%s', \$! = '%s'\n", $name, $?, $! ) ) unless $ip;
617              
618 8         35 my $elapsed = time() - $start;
619             # $SIG{ALRM} restored automatically, but we need to restart previous alarm manually
620              
621 8 50       22 $self->_debug_msg( format_message( '_connect: %s (remaining) - %s (elapsed) = %s', $remaining, $elapsed, $remaining - $elapsed ) )
622             if $self->debug_level;
623 8 100       28 if ( $remaining ) {
624 2 100       10 if ( $remaining - $elapsed > 0 ) {
625 1 50       4 $self->_debug_msg( '_connect: remaining - elapsed > 0 (to alarm restart)' )
626             if $self->debug_level;
627 1         6 alarm( ceil( $remaining - $elapsed ) );
628             } else {
629 1 50       6 $self->_debug_msg( '_connect: remaining - elapsed < 0 (to alarm function call)' )
630             if $self->debug_level;
631             # $SIG{ALRM}->();
632 1         62 kill ALRM => $$;
633             }
634 2 50       13 $self->_debug_msg( "_connect: after alarm 'recalled'" )
635             if $self->debug_level;
636             }
637             } else {
638 0         0 $ip = $self->_gethostbyname( $name );
639 0 0       0 die( format_message( "could not resolve host name to IP address: %s\n", $name ) ) unless $ip;
640             }
641             }
642              
643             # Create socket.
644 10 50       905 socket( my $connection, $self->{pf}, SOCK_STREAM, scalar getprotobyname( 'tcp' ) ) or die( "socket: $!\n" );
645              
646             # Set autoflushing.
647 10         63 my $file_handle = select( $connection ); $| = 1; select $file_handle;
  10         37  
  10         34  
648              
649             # Set FD_CLOEXEC.
650 10 50       49 my $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl: $!\n";
651 10 50       46 fcntl( $connection, F_SETFL, $flags | FD_CLOEXEC ) or die "fnctl: $!\n";
652              
653 10 50       35 $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n"; # 0 for error, 0e0 for 0.
654 10 50       34 fcntl( $connection, F_SETFL, $flags | O_NONBLOCK ) or die "fcntl F_SETFL O_NONBLOCK: $!\n"; # 0 for error, 0e0 for 0.
655              
656             # Connect returns immediately because of O_NONBLOCK.
657             my $sockaddr = $self->{af} eq AF_INET
658             ? pack_sockaddr_in( $port, inet_aton( $ip ) )
659 10 100       105 : pack_sockaddr_in6( $port, inet_pton( $self->{af}, $ip ) )
660             ;
661 10 100 66     877 connect( $connection, $sockaddr ) || $!{EINPROGRESS} || die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) );
662              
663             # Reset O_NONBLOCK.
664 9 50       265 $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n"; # 0 for error, 0e0 for 0.
665 9 50       42 fcntl( $connection, F_SETFL, $flags & ~ O_NONBLOCK ) or die "fcntl F_SETFL not O_NONBLOCK: $!\n"; # 0 for error, 0e0 for 0.
666              
667             # Use select() to poll for completion or error. When connect succeeds we can write.
668 9         21 my $vec = '';
669 9         40 vec( $vec, fileno( $connection ), 1 ) = 1;
670 9   33     67 select( undef, $vec, undef, $timeout // $REQUEST_TIMEOUT );
671 9 50       31 unless ( vec( $vec, fileno( $connection ), 1 ) ) {
672             # If no response yet, impose our own timeout.
673 0         0 $! = ETIMEDOUT;
674 0         0 die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) );
675             }
676              
677             # This is how we see whether it connected or there was an error. Document Unix, are you kidding?!
678 9         74 $! = unpack( 'L', getsockopt( $connection, SOL_SOCKET, SO_ERROR ) );
679 9 50       40 die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) ) if $!;
680              
681             # Set timeout on all reads and writes.
682             #
683             # Note the difference between Perl's sysread() and read() calls: sysread()
684             # queries the kernel exactly once, with max delay specified here. read()
685             # queries the kernel repeatedly until there's a read error (such as this
686             # timeout), EOF, or a full buffer. So when using read() with a timeout of one
687             # second, if the remote server sends 1 byte repeatedly at 1 second intervals,
688             # read() will read the whole buffer very slowly and sysread() will return only
689             # the first byte. The print() and syswrite() calls are similarly different.
690             # <> is of course similar to read() but delimited by newlines instead of buffer
691             # sizes.
692 9   33     38 my $timeval = _get_timeval( $timeout // $REQUEST_TIMEOUT );
693 9   50     55 setsockopt( $connection, SOL_SOCKET, SO_SNDTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_SNDTIMEO: $!\n";
694 9   50     36 setsockopt( $connection, SOL_SOCKET, SO_RCVTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_RCVTIMEO: $!\n";
695              
696 9         20 $self->{socket} = $connection;
697 9         83 my $s = $self->{_io_select} = IO::Select->new;
698 9         156 $s->add( $self->{socket} );
699              
700 9         606 return $connection;
701             }
702              
703             # Packing timeval
704             # uses http://trinitum.org/wp/packing-timeval/
705             sub _get_timeval {
706 9     9   19 my $timeout = shift;
707              
708 9         20 my $intval = int( $timeout ); # sec
709 9         27 my $fraction = int( ( $timeout - $intval ) * 1_000_000 ); # ms
710              
711 9 50 33     135 if ( $Config{osname} eq 'netbsd' && _major_osvers() >= 6 && $Config{longsize} == 4 ) {
      33        
712 0 0       0 if ( defined $Config{use64bitint} ) {
713 0         0 $timeout = pack( 'QL', int( $timeout ), $fraction );
714             } else {
715             $timeout = pack(
716             'LLL',
717             (
718 0 0       0 $Config{byteorder} eq '1234'
719             ? ( $timeout, 0, $fraction )
720             : ( 0, $timeout, $fraction )
721             )
722             );
723             }
724             } else {
725 9         46 $timeout = pack( 'L!L!', $timeout, $fraction );
726             }
727              
728 9         30 return $timeout;
729             }
730              
731             sub _major_osvers {
732 0     0   0 my $osvers = $Config{osvers};
733 0         0 my ( $major_osvers ) = $osvers =~ /^(\d+)/;
734 0         0 $major_osvers += 0;
735              
736 0         0 return $major_osvers;
737             }
738              
739             sub _gethostbyname {
740 8     8   19 my ( $self, $name ) = @_;
741              
742 8         12 my $is_v4_fqdn = 1;
743 8         22 $self->{ip} = '';
744              
745 8         19 my $ip_version = $self->{ip_version};
746 8 100 100     36 if ( defined( $ip_version ) && $ip_version == $IP_V6 ) {
747 1         70 my ( $err, @addrs ) = getaddrinfo(
748             $name,
749             '', # not interested in the service name
750             {
751             family => AF_INET6,
752             socktype => SOCK_STREAM,
753             protocol => IPPROTO_TCP,
754             },
755             );
756 1 50       7 return( $self->{ip} ) if $err;
757              
758 1         2 $is_v4_fqdn = 0;
759 1         3 for my $addr ( @addrs ) {
760 1         12 my ( $err, $ipaddr ) = getnameinfo( $addr->{addr}, NI_NUMERICHOST, NIx_NOSERV );
761 1 50       4 next if $err;
762              
763 1         3 $self->{af} = AF_INET6;
764 1         3 $self->{pf} = PF_INET6;
765 1         4 $self->{ip} = $ipaddr;
766 1         4 last;
767             }
768             }
769              
770 8 50 66     44 if ( $is_v4_fqdn && ( !defined( $ip_version ) || $ip_version == $IP_V4 ) ) {
      66        
771 7 100       47467 if ( my $ipaddr = gethostbyname( $name ) ) {
772 5         39 $self->{ip} = inet_ntop( $self->{af}, $ipaddr );
773             }
774             }
775              
776 8         56 return $self->{ip};
777             }
778              
779             sub _get_family {
780 13     13   31 my ( $self, $name ) = @_;
781              
782 13         19 my $is_ip;
783 13   100     53 my $ip_version = $self->{ip_version} // 0;
784 13 100 33     66 if ( ( ( $is_ip = is_ipv6( $name ) ) && !$ip_version ) || $ip_version == $IP_V6 ) {
    100 66        
    50 100        
      100        
785 2 100 66     68 $self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) )
      33        
786             if
787             $ip_version
788             && (
789             ( !$is_ip && is_ipv4( $name ) )
790             || ( $is_ip && $ip_version == $IP_V4 )
791             )
792             ;
793              
794 1         23 $self->{af} = AF_INET6;
795 1         3 $self->{pf} = PF_INET6;
796             } elsif ( ( ( $is_ip = is_ipv4( $name ) ) && !$ip_version ) || $ip_version == $IP_V4 ) {
797 3 50 33     192 $self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) )
      66        
798             if
799             $ip_version
800             && (
801             ( !$is_ip && is_ipv6( $name ) )
802             || ( $is_ip && $ip_version == $IP_V6 )
803             )
804             ;
805              
806 3         27 $self->{af} = AF_INET;
807 3         9 $self->{pf} = PF_INET;
808             } elsif ( !$ip_version ) {
809 8         372 $self->{af} = AF_INET;
810 8         20 $self->{pf} = PF_INET;
811             }
812              
813 12         38 return $is_ip;
814             }
815              
816             # Show additional debugging information
817             sub _debug_msg {
818 0     0   0 my ( $self, $message, $header, $colour ) = @_;
819              
820 0 0       0 if ( $header ) {
821 0 0       0 unless ( $_hdr ) {
822 0         0 require Data::HexDump::Range;
823 0         0 $_hdr = Data::HexDump::Range->new(
824             FORMAT => 'ANSI', # 'ANSI'|'ASCII'|'HTML'
825             COLOR => 'bw', # 'bw' | 'cycle'
826             OFFSET_FORMAT => 'hex', # 'hex' | 'dec'
827             DATA_WIDTH => 16, # 16 | 20 | ...
828             DISPLAY_RANGE_NAME => 0,
829             # MAXIMUM_RANGE_NAME_SIZE => 16,
830             DISPLAY_COLUMN_NAMES => 1,
831             DISPLAY_RULER => 1,
832             DISPLAY_OFFSET => 1,
833             # DISPLAY_CUMULATIVE_OFFSET => 1,
834             DISPLAY_ZERO_SIZE_RANGE_WARNING => 0,
835             DISPLAY_ZERO_SIZE_RANGE => 1,
836             DISPLAY_RANGE_NAME => 0,
837             # DISPLAY_RANGE_SIZE => 1,
838             DISPLAY_ASCII_DUMP => 1,
839             DISPLAY_HEX_DUMP => 1,
840             # DISPLAY_DEC_DUMP => 1,
841             # COLOR_NAMES => {},
842             ORIENTATION => 'horizontal',
843             );
844             }
845              
846             say STDERR
847 0         0 "# $header ", $self->{host}, ':', $self->{port}, "\n",
848             '# Hex Stream: ', unpack( 'H*', $message ), "\n",
849             $_hdr->dump(
850             [
851             [ 'data', length( $message ), $colour ],
852             ],
853             $message
854             )
855             ;
856             } else {
857 0         0 say STDERR format_message( '[%s] %s', scalar( localtime ), $message );
858             }
859              
860 0         0 return;
861             }
862              
863             # Handler for errors
864             sub _error {
865 88     88   67773 my $self = shift;
866 88         263 my %args = throw_args( @_ );
867 88 50       269 $self->_debug_msg( format_message( 'throwing IO error %s: %s', $args{code}, $args{message} ) )
868             if $self->debug_level;
869 88         475 Kafka::Exception::IO->throw( %args );
870             }
871              
872              
873              
874             1;
875              
876             __END__