File Coverage

lib/Kafka/Internals.pm
Criterion Covered Total %
statement 75 75 100.0
branch 17 20 85.0
condition 12 17 70.5
subroutine 17 17 100.0
pod 3 3 100.0
total 124 132 93.9


line stmt bran cond sub pod time code
1             package Kafka::Internals;
2              
3             =head1 NAME
4              
5             Kafka::Internals - Constants and functions used internally.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.06 .
10              
11             =cut
12              
13              
14              
15 16     16   352418 use 5.010;
  16         63  
16 16     16   107 use strict;
  16         40  
  16         440  
17 16     16   103 use warnings;
  16         43  
  16         1045  
18              
19              
20              
21             our $VERSION = '1.06';
22              
23 16         1116 use Exporter qw(
24             import
25 16     16   247 );
  16         72  
26              
27             our @EXPORT_OK = qw(
28             $APIKEY_PRODUCE
29             $APIKEY_FETCH
30             $APIKEY_OFFSET
31             $APIKEY_METADATA
32             $APIKEY_FINDCOORDINATOR
33             $APIKEY_APIVERSIONS
34             $APIKEY_OFFSETCOMMIT
35             $APIKEY_OFFSETFETCH
36             $DEFAULT_RAISE_ERROR
37             $MAX_CORRELATIONID
38             $MAX_INT16
39             $MAX_INT32
40             $MAX_SOCKET_REQUEST_BYTES
41             $PRODUCER_ANY_OFFSET
42             debug_level
43             _isbig
44             _get_CorrelationId
45             format_message
46             format_reference
47             );
48              
49              
50              
51 16     16   117 use overload;
  16         34  
  16         189  
52 16     16   1182 use Carp;
  16         37  
  16         1006  
53 16     16   108 use Const::Fast;
  16         32  
  16         151  
54 16     16   11611 use Data::Dumper ();
  16         113893  
  16         658  
55 16         1012 use Params::Util qw(
56             _INSTANCE
57 16     16   1612 );
  16         7057  
58 16     16   2619 use Try::Tiny;
  16         9729  
  16         912  
59              
60 16         7411 use Kafka qw(
61             %ERROR
62             $ERROR_MISMATCH_ARGUMENT
63 16     16   455 );
  16         32  
64              
65              
66              
67             =head1 SYNOPSIS
68              
69             use 5.010;
70             use strict;
71             use warnings;
72              
73             use Kafka::Internals qw(
74             $MAX_SOCKET_REQUEST_BYTES
75             );
76              
77             my $bin_stream_size = $MAX_SOCKET_REQUEST_BYTES;
78              
79             =head1 DESCRIPTION
80              
81             This module is private and should not be used directly.
82              
83             In order to achieve better performance, functions of this module do
84             not perform arguments validation.
85              
86             =head2 EXPORT
87              
88             The following constants are available for export
89              
90             =cut
91              
92             #-- Api Keys
93              
94             =head3 C<$APIKEY_PRODUCE>
95              
96             The numeric code that the C in the request take for the C request type.
97              
98             =cut
99             const our $APIKEY_PRODUCE => 0;
100              
101             =head3 C<$APIKEY_FETCH>
102              
103             The numeric code that the C in the request take for the C request type.
104              
105             =cut
106             const our $APIKEY_FETCH => 1;
107              
108             =head3 C<$APIKEY_OFFSET>
109              
110             The numeric code that the C in the request take for the C request type.
111              
112             =cut
113             const our $APIKEY_OFFSET => 2;
114              
115             =head3 C<$APIKEY_METADATA>
116              
117             The numeric code that the C in the request take for the C request type.
118              
119             =cut
120             const our $APIKEY_METADATA => 3;
121             # The numeric code that the ApiKey in the request take for the C request type.
122             const our $APIKEY_LEADERANDISR => 4; # Not used now
123             # The numeric code that the ApiKey in the request take for the C request type.
124             const our $APIKEY_STOPREPLICA => 5; # Not used now
125              
126             =head3 C<$APIKEY_OFFSETCOMMIT>
127              
128             The numeric code that the ApiKey in the request take for the C request type.
129              
130             =cut
131             const our $APIKEY_OFFSETCOMMIT => 8;
132              
133             =head3 C<$APIKEY_OFFSETFETCH>
134              
135             The numeric code that the ApiKey in the request take for the C request type.
136              
137             =cut
138             const our $APIKEY_OFFSETFETCH => 9;
139              
140             =head3 C<$APIKEY_FINDCOORDINATOR>
141              
142             The numeric code that the C in the request take for the C request type.
143              
144             =cut
145             const our $APIKEY_FINDCOORDINATOR => 10;
146              
147             =head3 C<$APIKEY_APIVERSIONS>
148              
149             The numeric code that the C in the request take for the C request type.
150              
151             =cut
152             const our $APIKEY_APIVERSIONS => 18;
153              
154             # Important configuration properties
155              
156             =head3 C<$MAX_SOCKET_REQUEST_BYTES>
157              
158             The maximum number of bytes in a socket request.
159              
160             The maximum size of a request that the socket server will accept.
161             Default limit (as configured in F) is 104857600.
162              
163             =cut
164             const our $MAX_SOCKET_REQUEST_BYTES => 100 * 1024 * 1024;
165              
166             =head3 C<$PRODUCER_ANY_OFFSET>
167              
168             According to Apache Kafka documentation: 'When the producer is sending messages it doesn't actually know the offset and can fill in any
169             value here it likes.'
170              
171             =cut
172             const our $PRODUCER_ANY_OFFSET => 0;
173              
174             =head3 C<$MAX_CORRELATIONID>
175              
176             Largest positive integer on 32-bit machines.
177              
178             =cut
179             const our $MAX_CORRELATIONID => 0x7fffffff;
180              
181             =head3 C<$MAX_INT32>
182              
183             Largest positive integer on 32-bit machines.
184              
185             =cut
186             const our $MAX_INT32 => 0x7fffffff;
187              
188             =head3 C<$MAX_INT16>
189              
190             Largest positive int16 value.
191              
192             =cut
193             const our $MAX_INT16 => 0x7fff;
194              
195             #-- public functions -----------------------------------------------------------
196              
197             #-- private functions ----------------------------------------------------------
198              
199             # Used to generate a CorrelationId.
200             sub _get_CorrelationId {
201 10174     10174   94633 return( -int( rand( $MAX_CORRELATIONID ) ) );
202             }
203              
204             # Verifies that the argument is of Math::BigInt type
205             sub _isbig {
206 10386     10386   19395 my ( $num ) = @_;
207              
208 10386         249716 return defined _INSTANCE( $num, 'Math::BigInt' );
209             }
210              
211             #-- public attributes ----------------------------------------------------------
212              
213             =head2 METHODS
214              
215             The following methods are defined in the C:
216              
217             =cut
218              
219             #-- public methods -------------------------------------------------------------
220              
221             =head3 C
222              
223             Gets or sets debug level for a particular L module, based on environment variable C or flags.
224              
225             $flags - (string) argument that can be used to pass coma delimited module names (omit C).
226              
227             Returns C<$DEBUG> level for the module from which C was called.
228              
229             =cut
230             our %_debug_levels; # per-module levels cache to speed-up multiple calls to debug_level()
231             sub debug_level {
232 16     16   139 no strict 'refs'; ## no critic
  16         41  
  16         10507  
233              
234 36545   66 36545 1 10598186 my $class = ref( $_[0] ) || $_[0];
235              
236 36545 100 100     136680 return ${ $_debug_levels{ $class } } if @_ == 1 && exists $_debug_levels{ $class };
  36516         93547  
237              
238 29   100     148 my $flags = $_[1] // $ENV{PERL_KAFKA_DEBUG};
239 29 100       88 if( defined $flags ) {
240 9         50 foreach my $spec ( split /\s*,\s*/, $flags ) {
241 12         104 my @elements = split( /\s*:\s*/, $spec, 2 );
242 12         25 my( $module_name, $level );
243 12 100       32 if ( scalar( @elements ) > 1 ) {
244 8         21 ( $module_name, $level ) = @elements;
245             } else {
246 4         28 $module_name = ( $class =~ /([^:]+)$/ )[0];
247 4         40 $level = $spec;
248             }
249              
250 12         24 *{ "Kafka::${module_name}::DEBUG" } = \$level; ## no critic
  12         82  
251             }
252             }
253              
254 29         64 $_debug_levels{ $class } = \${ "${class}::DEBUG" }; ## no critic
  29         222  
255              
256 29         67 return ${ $_debug_levels{ $class } };
  29         138  
257             }
258              
259             =head2 format_reference
260              
261             say format_reference( $object );
262              
263             Dumps reference using preconfigured L. Produces less verbose
264             output than default L settings.
265              
266             =cut
267              
268             my $dumper;
269             my $empty_array = [];
270              
271             sub format_reference {
272 62     62 1 133 my ( $value ) = @_;
273              
274 62 100       177 unless( $dumper ) {
275 5         122 $dumper = Data::Dumper->new( $empty_array )
276             ->Indent( 0 )
277             ->Terse( 1 )
278             ->Quotekeys( 0 )
279             ->Sortkeys( 1 )
280             ->Useperl( 1 ) # XS version seems to have a bug which sometimes results in modification of original object
281             # ->Sparseseen( 1 ) # speed up since we don't use "Seen" hash
282             ;
283             }
284              
285 62         956 my $r;
286 62 100 66     209 if (
287             overload::Overloaded( $value ) &&
288             overload::Method( $value, '""' )
289             ) {
290 8         2368 $r = "$value"; # force stringification
291             } else {
292 54         3042 $r = $dumper->Values( [ $value ] )->Dump;
293 54         11972 $dumper->Reset->Values( $empty_array );
294             }
295              
296 62         25207 return $r;
297             }
298              
299             =head2 format_message
300              
301             $string = format_message( 'Object %d loaded. Status: %s', $id, $message );
302              
303             Returns string formatted using printf-style syntax.
304              
305             If there are more than one argument and the first argument contains C<%...>
306             conversions, arguments are converted to a string message using C. In this case, undefined
307             values are printed as C<< >> and references are converted to strings using L.
308              
309             =cut
310             sub format_message {
311 825   50 825 1 3322 my $format = shift // return;
312              
313 825         4733 my $got = scalar @_;
314              
315 825 50 33     4429 return $format unless $got && $format =~ /\%/;
316              
317 825         1433 my $expected = 0;
318 825         5588 while ( $format =~ /(%%|%[^%])/g ) {
319 1821 50       5530 next if $1 eq '%%'; # don't count escape sequence
320 1821         6770 ++$expected;
321             }
322              
323 825 50       1883 Carp::cluck "Wrong number of arguments: $expected vs $got" unless $got == $expected;
324              
325             return sprintf $format, map {
326 825 100       1825 !defined $_
  1821 100       11390  
327             ? ''
328             : ref $_
329             ? format_reference( $_ )
330             : $_
331             } @_;
332             }
333              
334             #-- private attributes ---------------------------------------------------------
335              
336             #-- private methods ------------------------------------------------------------
337              
338             1;
339              
340             __END__