File Coverage

lib/Kafka/IO.pm
Criterion Covered Total %
statement 261 332 78.6
branch 80 170 47.0
condition 47 128 36.7
subroutine 34 37 91.8
pod 4 4 100.0
total 426 671 63.4


line stmt bran cond sub pod time code
1             package Kafka::IO;
2              
3             =head1 NAME
4              
5             Kafka::IO - Interface to network communication with the Apache Kafka server.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.06 .
10              
11             =cut
12              
13              
14              
15 15     15   176859 use 5.010;
  15         64  
16 15     15   107 use strict;
  15         34  
  15         408  
17 15     15   90 use warnings;
  15         31  
  15         983  
18              
19              
20              
21             our $DEBUG = 0;
22              
23             our $VERSION = '1.06';
24              
25              
26              
27 15     15   94 use Carp;
  15         36  
  15         880  
28 15     15   98 use Config;
  15         34  
  15         652  
29 15     15   107 use Const::Fast;
  15         34  
  15         133  
30 15         975 use Data::Validate::Domain qw(
31             is_hostname
32 15     15   7305 );
  15         155549  
33 15         1546 use Data::Validate::IP qw(
34             is_ipv4
35             is_ipv6
36 15     15   7705 );
  15         381124  
37 15         1187 use Errno qw(
38             EAGAIN
39             ECONNRESET
40             EINTR
41             EWOULDBLOCK
42             ETIMEDOUT
43 15     15   3308 );
  15         9661  
44 15     15   104 use Fcntl;
  15         35  
  15         3620  
45 15     15   8777 use IO::Select;
  15         25480  
  15         766  
46 15         839 use Params::Util qw(
47             _STRING
48 15     15   1958 );
  15         9054  
49 15         126 use POSIX qw(
50             ceil
51 15     15   3159 );
  15         44082  
52 15         1032 use Scalar::Util qw(
53             dualvar
54 15     15   10319 );
  15         35  
55 15         1973 use Socket qw(
56             AF_INET
57             AF_INET6
58             IPPROTO_TCP
59             MSG_DONTWAIT
60             MSG_PEEK
61             NI_NUMERICHOST
62             NIx_NOSERV
63             PF_INET
64             PF_INET6
65             SOCK_STREAM
66             SOL_SOCKET
67             SO_ERROR
68             SO_RCVTIMEO
69             SO_SNDTIMEO
70             getaddrinfo
71             getnameinfo
72             inet_aton
73             inet_pton
74             inet_ntop
75             pack_sockaddr_in
76             pack_sockaddr_in6
77 15     15   107 );
  15         68  
78 15         910 use Sys::SigAction qw(
79             set_sig_handler
80 15     15   7464 );
  15         48387  
81 15     15   119 use Time::HiRes ();
  15         37  
  15         312  
82 15     15   1804 use Try::Tiny;
  15         6461  
  15         1034  
83              
84 15         2436 use Kafka qw(
85             $ERROR_CANNOT_BIND
86             $ERROR_CANNOT_RECV
87             $ERROR_CANNOT_SEND
88             $ERROR_MISMATCH_ARGUMENT
89             $ERROR_INCOMPATIBLE_HOST_IP_VERSION
90             $ERROR_NO_CONNECTION
91             $IP_V4
92             $IP_V6
93             $KAFKA_SERVER_PORT
94             $REQUEST_TIMEOUT
95 15     15   101 );
  15         35  
96 15     15   3822 use Kafka::Exceptions;
  15         50  
  15         955  
97 15         47260 use Kafka::Internals qw(
98             $MAX_SOCKET_REQUEST_BYTES
99             debug_level
100             format_message
101 15     15   106 );
  15         32  
102              
103              
104              
105             =head1 SYNOPSIS
106              
107             use 5.010;
108             use strict;
109             use warnings;
110              
111             use Scalar::Util qw(
112             blessed
113             );
114             use Try::Tiny;
115              
116             use Kafka::IO;
117              
118             my $io;
119             try {
120             $io = Kafka::IO->new( host => 'localhost' );
121             } catch {
122             my $error = $_;
123             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
124             warn 'Error: (', $error->code, ') ', $error->message, "\n";
125             exit;
126             } else {
127             die $error;
128             }
129             };
130              
131             # Closes and cleans up
132             $io->close;
133             undef $io;
134              
135             =head1 DESCRIPTION
136              
137             This module is private and should not be used directly.
138              
139             In order to achieve better performance, methods of this module do not
140             perform arguments validation.
141              
142             The main features of the C class are:
143              
144             =over 3
145              
146             =item *
147              
148             Provides an object oriented API for communication with Kafka.
149              
150             =item *
151              
152             This class allows you to create Kafka 0.9+ clients.
153              
154             =back
155              
156             =cut
157              
158             # Hard limit of IO operation retry attempts, to prevent high CPU usage in IO retry loop
159             const my $MAX_RETRIES => 30;
160              
161             our $_hdr;
162              
163             #-- constructor ----------------------------------------------------------------
164              
165             =head2 CONSTRUCTOR
166              
167             =head3 C
168              
169             Establishes TCP connection to given host and port, creates and returns C IO object.
170              
171             C takes arguments in key-value pairs. The following arguments are currently recognized:
172              
173             =over 3
174              
175             =item C $host>
176              
177             C<$host> is Kafka host to connect to. It can be a host name or an IP-address in
178             IPv4 or IPv6 form (for example '127.0.0.1', '0:0:0:0:0:0:0:1' or '::1').
179              
180             =item C $port>
181              
182             Optional, default = C<$KAFKA_SERVER_PORT>.
183              
184             C<$port> is integer attribute denoting the port number of to access Apache Kafka.
185              
186             C<$KAFKA_SERVER_PORT> is the default Apache Kafka server port that can be imported
187             from the L module.
188              
189             =item C $timeout>
190              
191             C<$REQUEST_TIMEOUT> is the default timeout that can be imported from the L module.
192              
193             Special behavior when C is set to C:
194              
195             =back
196              
197             =over 3
198              
199             =item *
200              
201             Alarms are not used internally (namely when performing C).
202              
203             =item *
204              
205             Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.
206              
207             =back
208              
209             =over 3
210              
211             =item C $ip_version>
212              
213             Force version of IP protocol for resolving host name (or interpretation of passed address).
214              
215             Optional, undefined by default, which works in the following way: version of IP address
216             is detected automatically, host name is resolved into IPv4 address.
217              
218             See description of L<$IP_V4|Kafka::IO/$IP_V4>, L<$IP_V6|Kafka::IO/$IP_V6>
219             in C L.
220              
221             =back
222              
223             =cut
224             sub new {
225 13     13 1 16794 my ( $class, %p ) = @_;
226              
227 13         211 my $self = bless {
228             host => '',
229             timeout => $REQUEST_TIMEOUT,
230             port => $KAFKA_SERVER_PORT,
231             ip_version => undef,
232             af => '', # Address family constant
233             pf => '', # Protocol family constant
234             ip => '', # Human-readable textual representation of the ip address
235             }, $class;
236              
237 13   66     244 exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
238              
239             # we trust it: make it untainted
240 13         160 ( $self->{host} ) = $self->{host} =~ /\A(.+)\z/;
241 13         124 ( $self->{port} ) = $self->{port} =~ /\A(.+)\z/;
242              
243 13         74 $self->{socket} = undef;
244 13         53 $self->{_io_select} = undef;
245 13         38 my $error;
246             try {
247 13     13   2004 $self->_connect();
248             } catch {
249 4     4   2027 $error = $_;
250 13         212 };
251              
252 13 100       550 $self->_error( $ERROR_CANNOT_BIND, format_message("Kafka::IO(%s:%s)->new: %s", $self->{host}, $self->{port}, $error ) )
253             if defined $error
254             ;
255 9         4193 return $self;
256             }
257              
258             #-- public attributes ----------------------------------------------------------
259              
260             =head2 METHODS
261              
262             The following methods are provided by C class:
263              
264             =cut
265              
266             =head3 C<< send( $message <, $timeout> ) >>
267              
268             Sends a C<$message> to Kafka.
269              
270             The argument must be a bytes string.
271              
272             Use optional C<$timeout> argument to override default timeout for this request only.
273              
274             Returns the number of characters sent.
275              
276             =cut
277             sub send {
278 1     1 1 85 my ( $self, $message, $timeout ) = @_;
279 1 50       8 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
280             unless defined( _STRING( $message ) )
281             ;
282 1         3 my $length = length( $message );
283 1 50       5 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
284             unless $length <= $MAX_SOCKET_REQUEST_BYTES
285             ;
286 1 50 33     7 $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
287 1 50       6 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
288             unless $timeout > 0
289             ;
290 1         3 my $select = $self->{_io_select};
291 1 50       4 $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select;
292              
293 1 50       6 $self->_debug_msg( $message, 'Request to', 'green' )
294             if $self->debug_level >= 2
295             ;
296 1         4 my $sent = 0;
297              
298 1         5 my $started = Time::HiRes::time();
299 1         4 my $until = $started + $timeout;
300              
301 1         2 my $error_code;
302             my $errno;
303 1         3 my $retries = 0;
304 1         2 my $interrupts = 0;
305 1   66     15 ATTEMPT: while ( $sent < $length && $retries++ < $MAX_RETRIES ) {
306 1         6 my $remaining_time = $until - Time::HiRes::time();
307 1 50       5 last ATTEMPT if $remaining_time <= 0; # timeout expired
308              
309 1         5 undef $!;
310 1         7 my $can_write = $select->can_write( $remaining_time );
311 1         64 $errno = $!;
312 1 50       5 if ( $errno ) {
313 0 0       0 if ( $errno == EINTR ) {
314 0         0 undef $errno;
315 0         0 --$retries; # this attempt does not count
316 0         0 ++$interrupts;
317 0         0 next ATTEMPT;
318             }
319              
320 0         0 $self->close;
321              
322 0         0 last ATTEMPT;
323             }
324              
325 1 50       84 if ( $can_write ) {
326             # check for EOF on the first attempt only
327 1 50 33     9 if ( $retries == 1 && $self->_is_close_wait ) {
328 0         0 $self->close;
329 0         0 $error_code = $ERROR_NO_CONNECTION;
330 0         0 last ATTEMPT;
331             }
332              
333 1         24 undef $!;
334 1         130 my $wrote = CORE::send( $self->{socket}, $message, MSG_DONTWAIT );
335 1         58 $errno = $!;
336              
337 1 50 33     14 if( defined $wrote && $wrote > 0 ) {
338 1         2 $sent += $wrote;
339 1 50       3 if ( $sent < $length ) {
340             # remove written data from message
341 0         0 $message = substr( $message, $wrote );
342             }
343             }
344              
345 1 50       4 if( $errno ) {
346 0 0 0     0 if( $errno == EINTR ) {
    0 0        
      0        
347 0         0 undef $errno;
348 0         0 --$retries; # this attempt does not count
349 0         0 ++$interrupts;
350 0         0 next ATTEMPT;
351             } elsif (
352             $errno != EAGAIN
353             && $errno != EWOULDBLOCK
354             ## on freebsd, if we got ECONNRESET, it's a timeout from the other side
355             && !( $errno == ECONNRESET && $^O eq 'freebsd' )
356             ) {
357 0         0 $self->close;
358 0         0 last ATTEMPT;
359             }
360             }
361              
362 1 50       5 last ATTEMPT unless defined $wrote;
363             }
364             }
365              
366 1 50 33     15 unless( !$errno && defined( $sent ) && $sent == $length )
      33        
367             {
368             $self->_error(
369             $error_code // $ERROR_CANNOT_SEND,
370             format_message( "Kafka::IO(%s)->send: ERRNO=%s ERROR='%s' (length=%s, sent=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)",
371             $self->{host},
372 0   0     0 ( $errno // 0 ) + 0,
      0        
      0        
373             ( $errno // '' ) . '',
374             $length,
375             $sent,
376             $timeout,
377             $retries,
378             $interrupts,
379             Time::HiRes::time() - $started,
380             )
381             );
382             }
383              
384 1         8 return $sent;
385             }
386              
387             =head3 C<< receive( $length <, $timeout> ) >>
388              
389             Receives a message up to C<$length> size from Kafka.
390              
391             C<$length> argument must be a positive number.
392              
393             Use optional C<$timeout> argument to override default timeout for this call only.
394              
395             Returns a reference to the received message.
396              
397             =cut
398             sub receive {
399 1     1 1 1663 my ( $self, $length, $timeout ) = @_;
400 1 50       6 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
401             unless $length > 0
402             ;
403 1 50 33     8 $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
404 1 50       6 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
405             unless $timeout > 0
406             ;
407 1         11 my $select = $self->{_io_select};
408 1 50       4 $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select;
409              
410 1         9 my $message = '';
411 1         3 my $len_to_read = $length;
412              
413 1         6 my $started = Time::HiRes::time();
414 1         2 my $until = $started + $timeout;
415              
416 1         3 my $error_code;
417             my $errno;
418 1         2 my $retries = 0;
419 1         2 my $interrupts = 0;
420 1   66     15 ATTEMPT: while ( $len_to_read > 0 && $retries++ < $MAX_RETRIES ) {
421 1         5 my $remaining_time = $until - Time::HiRes::time();
422 1 50       38 last if $remaining_time <= 0; # timeout expired
423              
424 1         5 undef $!;
425 1         6 my $can_read = $select->can_read( $remaining_time );
426 1         48 $errno = $!;
427 1 50       5 if ( $errno ) {
428 0 0       0 if ( $errno == EINTR ) {
429 0         0 undef $errno;
430 0         0 --$retries; # this attempt does not count
431 0         0 ++$interrupts;
432 0         0 next ATTEMPT;
433             }
434              
435 0         0 $self->close;
436              
437 0         0 last ATTEMPT;
438             }
439              
440 1 50       4 if ( $can_read ) {
441 1         2 my $buf = '';
442 1         3 undef $!;
443 1         21 my $from_recv = CORE::recv( $self->{socket}, $buf, $len_to_read, MSG_DONTWAIT );
444 1         5 $errno = $!;
445              
446 1 50 33     20 if ( defined( $from_recv ) && length( $buf ) ) {
447 1         3 $message .= $buf;
448 1         2 $len_to_read = $length - length( $message );
449 1         3 --$retries; # this attempt was successful, don't count as a retry
450             }
451 1 50       3 if ( $errno ) {
452 0 0 0     0 if ( $errno == EINTR ) {
    0 0        
      0        
453 0         0 undef $errno;
454 0         0 --$retries; # this attempt does not count
455 0         0 ++$interrupts;
456 0         0 next ATTEMPT;
457             } elsif (
458             $errno != EAGAIN
459             && $errno != EWOULDBLOCK
460             ## on freebsd, if we got ECONNRESET, it's a timeout from the other side
461             && !( $errno == ECONNRESET && $^O eq 'freebsd' )
462             ) {
463 0         0 $self->close;
464 0         0 last ATTEMPT;
465             }
466             }
467              
468 1 50       6 if ( length( $buf ) == 0 ) {
469 0 0 0     0 if( defined( $from_recv ) && ! $errno ) {
470             # no error and nothing received with select returning "can read" means EOF: other side closed socket
471 0 0       0 $self->_debug_msg( 'EOF on receive attempt, closing socket' )
472             if $self->debug_level;
473 0         0 $self->close;
474              
475 0 0       0 if( length( $message ) == 0 ) {
476             # we did not receive anything yet, so we may (in some cases) reconnect and try again
477 0         0 $error_code = $ERROR_NO_CONNECTION;
478             }
479              
480 0         0 last ATTEMPT;
481             }
482             # we did not read anything on this attempt: wait a bit before the next one; should not happen, but just in case...
483 0 0       0 if ( my $remaining_attempts = $MAX_RETRIES - $retries ) {
484 0         0 $remaining_time = $until - Time::HiRes::time();
485 0         0 my $micro_seconds = int( $remaining_time * 1e6 / $remaining_attempts );
486 0 0       0 if ( $micro_seconds > 0 ) {
487 0 0       0 $micro_seconds = 250_000 if $micro_seconds > 250_000; # prevent long sleeps if total remaining time is big
488 0 0       0 $self->_debug_msg( format_message( 'sleeping (remaining attempts %d, time %.6f): %d microseconds', $remaining_attempts, $remaining_time, $micro_seconds ) )
489             if $self->debug_level;
490 0         0 Time::HiRes::usleep( $micro_seconds );
491             }
492             }
493             }
494             }
495             }
496              
497 1 50 33     14 unless( !$errno && length( $message ) >= $length )
498             {
499             $self->_error(
500             $error_code // $ERROR_CANNOT_RECV,
501             format_message( "Kafka::IO(%s)->receive: ERRNO=%s ERROR='%s' (length=%s, received=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)",
502             $self->{host},
503 0   0     0 ( $errno // 0 ) + 0,
      0        
      0        
504             ( $errno // '' ) . '',
505             $length,
506             length( $message ),
507             $timeout,
508             $retries,
509             $interrupts,
510             Time::HiRes::time() - $started,
511             ),
512             );
513             }
514 1 50       9 $self->_debug_msg( $message, 'Response from', 'yellow' )
515             if $self->debug_level >= 2;
516              
517             # returns tainted data
518 1         7 return \$message;
519             }
520              
521             =head3 C
522              
523             Closes connection to Kafka server.
524             Returns true if those operations succeed and if no error was reported by any PerlIO layer.
525              
526             =cut
527             sub close {
528 1     1 1 4 my ( $self ) = @_;
529              
530 1         3 my $ret = 1;
531 1 50       8 if ( $self->{socket} ) {
532 1         89 $ret = CORE::close( $self->{socket} );
533 1         6 $self->{socket} = undef;
534 1         8 $self->{_io_select} = undef;
535             }
536              
537 1         4 return $ret;
538             }
539              
540             sub _is_close_wait {
541 1     1   3 my ( $self ) = @_;
542 1 50 33     36 return 1 unless $self->{socket} && $self->{_io_select}; # closed already
543             # http://stefan.buettcher.org/cs/conn_closed.html
544             # socket is open; check if we can read, and if we can but recv() cannot peek, it means we got EOF
545 1 50       7 return unless $self->{_io_select}->can_read( 0 ); # we cannot read, but may be able to write
546 0         0 my $buf = '';
547 0         0 undef $!;
548 0         0 my $status = CORE::recv( $self->{socket}, $buf, 1, MSG_DONTWAIT | MSG_PEEK ); # peek, do not remove data from queue
549             # EOF when there is no error, status is defined, but result is empty
550 0   0     0 return ! $! && defined $status && length( $buf ) == 0;
551             }
552              
553             # The method verifies if we can connect to a Kafka broker.
554             # This is evil: opens and immediately closes a NEW connection so do not use unless there is a strong reason for it.
555             sub _is_alive {
556 3     3   825775 my ( $self ) = @_;
557              
558 3         16 my $socket = $self->{socket};
559 3 100       21 return unless $socket;
560              
561 2         62 socket( my $tmp_socket, $self->{pf}, SOCK_STREAM, IPPROTO_TCP );
562 2         202 my $is_alive = connect( $tmp_socket, getpeername( $socket ) );
563 2         205 CORE::close( $tmp_socket );
564              
565 2         28 return $is_alive;
566             }
567              
568             #-- private attributes ---------------------------------------------------------
569              
570             #-- private methods ------------------------------------------------------------
571              
572             # You need to have access to Kafka instance and be able to connect through TCP.
573             # uses http://devpit.org/wiki/Connect%28%29_with_timeout_%28in_Perl%29
574             sub _connect {
575 13     13   43 my ( $self ) = @_;
576              
577 13         39 $self->{socket} = undef;
578 13         43 $self->{_io_select} = undef;
579              
580 13         37 my $name = $self->{host};
581 13         34 my $port = $self->{port};
582 13         37 my $timeout = $self->{timeout};
583              
584 13         38 my $ip = '';
585 13 100       67 if ( $self->_get_family( $name ) ) {
586 2         8 $ip = $self->{ip} = $name;
587             } else {
588 10 50       33 if ( defined $timeout ) {
589 10         21 my $remaining;
590 10         26 my $start = time();
591              
592 10 50       67 $self->_debug_msg( format_message( "name = '%s', number of wallclock seconds = %s", $name, ceil( $timeout ) ) )
593             if $self->debug_level;
594              
595             # DNS lookup.
596 10         23 local $@;
597 0     0   0 my $h = set_sig_handler( 'ALRM', sub { die 'alarm clock restarted' },
598             {
599 10         159 mask => [ 'ALRM' ],
600             safe => 0, # perl 5.8+ uses safe signal delivery so we need unsafe signal for timeout to work
601             }
602             );
603 10         1998 eval {
604 10         129 $remaining = alarm( ceil( $timeout ) );
605 10         58 $ip = $self->_gethostbyname( $name );
606 10         15000256 alarm 0;
607             };
608 10         46 alarm 0; # race condition protection
609 10         28 my $error = $@;
610 10         82 undef $h;
611              
612 10 50       718 $self->_debug_msg( format_message( "_connect: ip = '%s', error = '%s', \$? = %s, \$! = '%s'", $ip, $error, $?, $! ) )
613             if $self->debug_level;
614              
615 10 50       42 die $error if $error;
616 10 100       94 die( format_message( "gethostbyname %s: \$? = '%s', \$! = '%s'\n", $name, $?, $! ) ) unless $ip;
617              
618 8         26 my $elapsed = time() - $start;
619             # $SIG{ALRM} restored automatically, but we need to restart previous alarm manually
620              
621 8 50       39 $self->_debug_msg( format_message( '_connect: %s (remaining) - %s (elapsed) = %s', $remaining, $elapsed, $remaining - $elapsed ) )
622             if $self->debug_level;
623 8 100       90 if ( $remaining ) {
624 2 100       13 if ( $remaining - $elapsed > 0 ) {
625 1 50       6 $self->_debug_msg( '_connect: remaining - elapsed > 0 (to alarm restart)' )
626             if $self->debug_level;
627 1         19 alarm( ceil( $remaining - $elapsed ) );
628             } else {
629 1 50       7 $self->_debug_msg( '_connect: remaining - elapsed < 0 (to alarm function call)' )
630             if $self->debug_level;
631             # $SIG{ALRM}->();
632 1         89 kill ALRM => $$;
633             }
634 2 50       18 $self->_debug_msg( "_connect: after alarm 'recalled'" )
635             if $self->debug_level;
636             }
637             } else {
638 0         0 $ip = $self->_gethostbyname( $name );
639 0 0       0 die( format_message( "could not resolve host name to IP address: %s\n", $name ) ) unless $ip;
640             }
641             }
642              
643             # Create socket.
644 10 50       1396 socket( my $connection, $self->{pf}, SOCK_STREAM, scalar getprotobyname( 'tcp' ) ) or die( "socket: $!\n" );
645              
646             # Set autoflushing.
647 10         86 my $file_handle = select( $connection ); $| = 1; select $file_handle;
  10         54  
  10         77  
648              
649             # Set FD_CLOEXEC.
650 10 50       66 my $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl: $!\n";
651 10 50       72 fcntl( $connection, F_SETFL, $flags | FD_CLOEXEC ) or die "fnctl: $!\n";
652              
653 10 50       52 $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n"; # 0 for error, 0e0 for 0.
654 10 50       56 fcntl( $connection, F_SETFL, $flags | O_NONBLOCK ) or die "fcntl F_SETFL O_NONBLOCK: $!\n"; # 0 for error, 0e0 for 0.
655              
656             # Connect returns immediately because of O_NONBLOCK.
657             my $sockaddr = $self->{af} eq AF_INET
658             ? pack_sockaddr_in( $port, inet_aton( $ip ) )
659 10 100       165 : pack_sockaddr_in6( $port, inet_pton( $self->{af}, $ip ) )
660             ;
661 10 100 66     1667 connect( $connection, $sockaddr ) || $!{EINPROGRESS} || die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) );
662              
663             # Reset O_NONBLOCK.
664 9 50       318 $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n"; # 0 for error, 0e0 for 0.
665 9 50       53 fcntl( $connection, F_SETFL, $flags & ~ O_NONBLOCK ) or die "fcntl F_SETFL not O_NONBLOCK: $!\n"; # 0 for error, 0e0 for 0.
666              
667             # Use select() to poll for completion or error. When connect succeeds we can write.
668 9         26 my $vec = '';
669 9         53 vec( $vec, fileno( $connection ), 1 ) = 1;
670 9   33     135 select( undef, $vec, undef, $timeout // $REQUEST_TIMEOUT );
671 9 50       44 unless ( vec( $vec, fileno( $connection ), 1 ) ) {
672             # If no response yet, impose our own timeout.
673 0         0 $! = ETIMEDOUT;
674 0         0 die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) );
675             }
676              
677             # This is how we see whether it connected or there was an error. Document Unix, are you kidding?!
678 9         123 $! = unpack( 'L', getsockopt( $connection, SOL_SOCKET, SO_ERROR ) );
679 9 50       55 die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) ) if $!;
680              
681             # Set timeout on all reads and writes.
682             #
683             # Note the difference between Perl's sysread() and read() calls: sysread()
684             # queries the kernel exactly once, with max delay specified here. read()
685             # queries the kernel repeatedly until there's a read error (such as this
686             # timeout), EOF, or a full buffer. So when using read() with a timeout of one
687             # second, if the remote server sends 1 byte repeatedly at 1 second intervals,
688             # read() will read the whole buffer very slowly and sysread() will return only
689             # the first byte. The print() and syswrite() calls are similarly different.
690             # <> is of course similar to read() but delimited by newlines instead of buffer
691             # sizes.
692 9   33     62 my $timeval = _get_timeval( $timeout // $REQUEST_TIMEOUT );
693 9   50     321 setsockopt( $connection, SOL_SOCKET, SO_SNDTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_SNDTIMEO: $!\n";
694 9   50     57 setsockopt( $connection, SOL_SOCKET, SO_RCVTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_RCVTIMEO: $!\n";
695              
696 9         27 $self->{socket} = $connection;
697 9         137 my $s = $self->{_io_select} = IO::Select->new;
698 9         227 $s->add( $self->{socket} );
699              
700 9         718 return $connection;
701             }
702              
703             # Packing timeval
704             # uses http://trinitum.org/wp/packing-timeval/
705             sub _get_timeval {
706 9     9   74 my $timeout = shift;
707              
708 9         32 my $intval = int( $timeout ); # sec
709 9         40 my $fraction = int( ( $timeout - $intval ) * 1_000_000 ); # ms
710              
711 9 50 33     217 if ( $Config{osname} eq 'netbsd' && _major_osvers() >= 6 && $Config{longsize} == 4 ) {
      33        
712 0 0       0 if ( defined $Config{use64bitint} ) {
713 0         0 $timeout = pack( 'QL', int( $timeout ), $fraction );
714             } else {
715             $timeout = pack(
716             'LLL',
717             (
718 0 0       0 $Config{byteorder} eq '1234'
719             ? ( $timeout, 0, $fraction )
720             : ( 0, $timeout, $fraction )
721             )
722             );
723             }
724             } else {
725 9         76 $timeout = pack( 'L!L!', $timeout, $fraction );
726             }
727              
728 9         36 return $timeout;
729             }
730              
731             sub _major_osvers {
732 0     0   0 my $osvers = $Config{osvers};
733 0         0 my ( $major_osvers ) = $osvers =~ /^(\d+)/;
734 0         0 $major_osvers += 0;
735              
736 0         0 return $major_osvers;
737             }
738              
739             sub _gethostbyname {
740 8     8   31 my ( $self, $name ) = @_;
741              
742 8         26 my $is_v4_fqdn = 1;
743 8         30 $self->{ip} = '';
744              
745 8         23 my $ip_version = $self->{ip_version};
746 8 100 100     60 if ( defined( $ip_version ) && $ip_version == $IP_V6 ) {
747 1         104 my ( $err, @addrs ) = getaddrinfo(
748             $name,
749             '', # not interested in the service name
750             {
751             family => AF_INET6,
752             socktype => SOCK_STREAM,
753             protocol => IPPROTO_TCP,
754             },
755             );
756 1 50       8 return( $self->{ip} ) if $err;
757              
758 1         3 $is_v4_fqdn = 0;
759 1         18 for my $addr ( @addrs ) {
760 1         21 my ( $err, $ipaddr ) = getnameinfo( $addr->{addr}, NI_NUMERICHOST, NIx_NOSERV );
761 1 50       5 next if $err;
762              
763 1         4 $self->{af} = AF_INET6;
764 1         4 $self->{pf} = PF_INET6;
765 1         4 $self->{ip} = $ipaddr;
766 1         6 last;
767             }
768             }
769              
770 8 50 66     60 if ( $is_v4_fqdn && ( !defined( $ip_version ) || $ip_version == $IP_V4 ) ) {
      66        
771 7 100       240094 if ( my $ipaddr = gethostbyname( $name ) ) {
772 5         62 $self->{ip} = inet_ntop( $self->{af}, $ipaddr );
773             }
774             }
775              
776 8         50 return $self->{ip};
777             }
778              
779             sub _get_family {
780 13     13   50 my ( $self, $name ) = @_;
781              
782 13         31 my $is_ip;
783 13   100     102 my $ip_version = $self->{ip_version} // 0;
784 13 100 33     113 if ( ( ( $is_ip = is_ipv6( $name ) ) && !$ip_version ) || $ip_version == $IP_V6 ) {
    100 66        
    50 100        
      100        
785 2 100 66     116 $self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) )
      33        
786             if
787             $ip_version
788             && (
789             ( !$is_ip && is_ipv4( $name ) )
790             || ( $is_ip && $ip_version == $IP_V4 )
791             )
792             ;
793              
794 1         30 $self->{af} = AF_INET6;
795 1         5 $self->{pf} = PF_INET6;
796             } elsif ( ( ( $is_ip = is_ipv4( $name ) ) && !$ip_version ) || $ip_version == $IP_V4 ) {
797 3 50 33     270 $self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) )
      66        
798             if
799             $ip_version
800             && (
801             ( !$is_ip && is_ipv6( $name ) )
802             || ( $is_ip && $ip_version == $IP_V6 )
803             )
804             ;
805              
806 3         31 $self->{af} = AF_INET;
807 3         10 $self->{pf} = PF_INET;
808             } elsif ( !$ip_version ) {
809 8         727 $self->{af} = AF_INET;
810 8         23 $self->{pf} = PF_INET;
811             }
812              
813 12         57 return $is_ip;
814             }
815              
816             # Show additional debugging information
817             sub _debug_msg {
818 0     0   0 my ( $self, $message, $header, $colour ) = @_;
819              
820 0 0       0 if ( $header ) {
821 0 0       0 unless ( $_hdr ) {
822 0         0 require Data::HexDump::Range;
823 0         0 $_hdr = Data::HexDump::Range->new(
824             FORMAT => 'ANSI', # 'ANSI'|'ASCII'|'HTML'
825             COLOR => 'bw', # 'bw' | 'cycle'
826             OFFSET_FORMAT => 'hex', # 'hex' | 'dec'
827             DATA_WIDTH => 16, # 16 | 20 | ...
828             DISPLAY_RANGE_NAME => 0,
829             # MAXIMUM_RANGE_NAME_SIZE => 16,
830             DISPLAY_COLUMN_NAMES => 1,
831             DISPLAY_RULER => 1,
832             DISPLAY_OFFSET => 1,
833             # DISPLAY_CUMULATIVE_OFFSET => 1,
834             DISPLAY_ZERO_SIZE_RANGE_WARNING => 0,
835             DISPLAY_ZERO_SIZE_RANGE => 1,
836             DISPLAY_RANGE_NAME => 0,
837             # DISPLAY_RANGE_SIZE => 1,
838             DISPLAY_ASCII_DUMP => 1,
839             DISPLAY_HEX_DUMP => 1,
840             # DISPLAY_DEC_DUMP => 1,
841             # COLOR_NAMES => {},
842             ORIENTATION => 'horizontal',
843             );
844             }
845              
846             say STDERR
847 0         0 "# $header ", $self->{host}, ':', $self->{port}, "\n",
848             '# Hex Stream: ', unpack( 'H*', $message ), "\n",
849             $_hdr->dump(
850             [
851             [ 'data', length( $message ), $colour ],
852             ],
853             $message
854             )
855             ;
856             } else {
857 0         0 say STDERR format_message( '[%s] %s', scalar( localtime ), $message );
858             }
859              
860 0         0 return;
861             }
862              
863             # Handler for errors
864             sub _error {
865 88     88   134842 my $self = shift;
866 88         436 my %args = throw_args( @_ );
867 88 50       422 $self->_debug_msg( format_message( 'throwing IO error %s: %s', $args{code}, $args{message} ) )
868             if $self->debug_level;
869 88         859 Kafka::Exception::IO->throw( %args );
870             }
871              
872              
873              
874             1;
875              
876             __END__