File Coverage

blib/lib/Fluent/Logger.pm
Criterion Covered Total %
statement 195 254 76.7
branch 44 80 55.0
condition 9 25 36.0
subroutine 35 40 87.5
pod 4 6 66.6
total 287 405 70.8


line stmt bran cond sub pod time code
1             # -*- coding: utf-8; -*-
2             package Fluent::Logger;
3              
4 8     8   412035 use strict;
  8         23  
  8         192  
5 8     8   39 use warnings;
  8         13  
  8         283  
6              
7             our $VERSION = '0.27';
8              
9 8     8   3180 use IO::Select;
  8         11855  
  8         321  
10 8     8   404 use IO::Socket::INET;
  8         17319  
  8         81  
11 8     8   4088 use IO::Socket::UNIX;
  8         18  
  8         72  
12 8     8   6410 use Data::MessagePack;
  8         7355  
  8         230  
13 8     8   3417 use Time::Piece;
  8         67314  
  8         32  
14 8     8   489 use Carp;
  8         16  
  8         435  
15 8     8   44 use Scalar::Util qw/ refaddr /;
  8         14  
  8         329  
16 8     8   443 use Time::HiRes qw/ time /;
  8         947  
  8         49  
17 8     8   4845 use UUID::Tiny qw/ create_uuid UUID_V4 /;
  8         86621  
  8         574  
18 8     8   54 use MIME::Base64 qw/ encode_base64 /;
  8         14  
  8         417  
19              
20 8     8   61 use constant RECONNECT_WAIT => 0.5;
  8         43  
  8         383  
21 8     8   36 use constant RECONNECT_WAIT_INCR_RATE => 1.5;
  8         14  
  8         319  
22 8     8   41 use constant RECONNECT_WAIT_MAX => 60;
  8         14  
  8         365  
23 8     8   41 use constant RECONNECT_WAIT_MAX_COUNT => 12;
  8         18  
  8         409  
24              
25 8     8   43 use constant MP_HEADER_3ELM_ARRAY => "\x93";
  8         14  
  8         374  
26 8     8   43 use constant MP_HEADER_4ELM_ARRAY => "\x94";
  8         15  
  8         432  
27 8     8   42 use constant MP_HEADER_EVENT_TIME => "\xd7\x00";
  8         9  
  8         391  
28              
29 8     8   3554 use subs 'prefer_integer';
  8         157  
  8         40  
30              
31             use Class::Tiny +{
32             tag_prefix => sub {},
33 1         41 host => sub { "127.0.0.1" },
34 0         0 port => sub { 24224 },
35             socket => sub {},
36 5         256 timeout => sub { 3.0 },
37 0         0 buffer_limit => sub { 8 * 1024 * 1024 }, # fixme
38 1         23 buffer_overflow_handler => sub { undef },
39 0         0 truncate_buffer_at_overflow => sub { 0 },
40 0         0 max_write_retry => sub { 5 },
41 2         57 write_length => sub { 8 * 1024 * 1024 },
42             socket_io => sub {},
43 2         30 errors => sub { [] },
44 4         52 prefer_integer => sub { 1 },
45             packer => sub {
46 4         43 my $self = shift;
47 4         124 my $mp = Data::MessagePack->new;
48 4         82 $mp->prefer_integer( $self->prefer_integer );
49 4         60 $mp;
50             },
51 0         0 pending => sub { "" },
52 2         36 connect_error_history => sub { +[] },
53             owner_pid => sub {},
54 4         39 event_time => sub { 0 },
55 4         39 ack => sub { 0 },
56 0         0 pending_acks => sub { +[] },
57             unpacker => sub {
58 0         0 require Data::MessagePack::Stream;
59 0         0 Data::MessagePack::Stream->new;
60             },
61             selector => sub { },
62 4         50 retry_immediately => sub { 0 },
63 8     8   5772 };
  8         12465  
  8         227  
64              
65             sub BUILD {
66 5     5 0 2019147 my $self = shift;
67 5         111 $self->_connect;
68             }
69              
70             sub prefer_integer {
71 4     4   11 my $self = shift;
72              
73 4 50       22 if (@_) {
    50          
74 0         0 $self->{prefer_integer} = shift;
75 0         0 $self->packer->prefer_integer( $self->prefer_integer );
76             } elsif ( exists $self->{prefer_integer} ) {
77 0         0 return $self->{prefer_integer};
78             } else {
79 4         32 my $defaults = Class::Tiny->get_all_attribute_defaults_for( ref $self );
80 4         348 return $self->{prefer_integer} = $defaults->{prefer_integer}->();
81             }
82             }
83              
84             sub _carp {
85 28     28   47 my $self = shift;
86 28         54 my $msg = shift;
87 28         59 chomp $msg;
88 28         105 carp(
89             sprintf "%s %s[%s](%s): %s",
90             localtime->strftime("%Y-%m-%dT%H:%M:%S%z"),
91             ref $self,
92             refaddr $self,
93             $self->_connect_info,
94             $msg,
95             );
96             }
97              
98             sub _add_error {
99 24     24   42 my $self = shift;
100 24         48 my $msg = shift;
101 24         84 $self->_carp($msg);
102 24         7591 push @{ $self->errors }, $msg;
  24         527  
103             }
104              
105             sub errstr {
106 0     0 1 0 my $self = shift;
107 0         0 return join ("\n", @{ $self->errors });
  0         0  
108             }
109              
110             sub _connect_info {
111 28     28   2876 my $self = shift;
112 28 50       530 $self->socket || sprintf "%s:%d", $self->host, $self->port;
113             }
114              
115             sub _connect {
116 17     17   48 my $self = shift;
117 17         46 my $force = shift;
118              
119 17 50 33     773 return if $self->socket_io && !$force;
120              
121 17 100       344 my $sock = defined $self->socket
122             ? IO::Socket::UNIX->new( Peer => $self->socket )
123             : IO::Socket::INET->new(
124             PeerAddr => $self->host,
125             PeerPort => $self->port,
126             Proto => 'tcp',
127             Timeout => $self->timeout,
128             ReuseAddr => 1,
129             );
130 17 100       13289 if (!$sock) {
131 14         105 $self->_add_error("Can't connect: $!");
132 14         80 push @{ $self->connect_error_history }, time;
  14         203  
133 14 50       96 if (@{ $self->connect_error_history } > RECONNECT_WAIT_MAX_COUNT) {
  14         183  
134 0         0 shift @{ $self->connect_error_history };
  0         0  
135             }
136 14         89 return;
137             }
138 3         127 $self->connect_error_history([]);
139 3         104 $self->owner_pid($$);
140 3         84 $self->selector(IO::Select->new($sock));
141 3         353 $self->socket_io($sock);
142             }
143              
144             sub close {
145 7     7 1 16 my $self = shift;
146              
147 7 100       34 if ( length $self->{pending} ) {
148 2         16 $self->_carp("flushing pending data on close");
149 2 50       541 $self->_connect unless $self->socket_io;
150 2         21 my $written = eval {
151 2         8 $self->_write( $self->{pending} );
152             };
153 2 50 33     37 if ($@ || !$written) {
154 2         21 my $size = length $self->{pending};
155 2         13 $self->_carp("Can't send pending data. LOST $size bytes.: $@");
156 2         436 $self->_call_buffer_overflow_handler();
157             } else {
158 0         0 $self->_carp("pending data was flushed successfully");
159             }
160             };
161 7         29 $self->{pending} = "";
162 7         15 $self->{pending_acks} = [];
163 7         42 delete $self->{selector};
164 7         15 my $socket = delete $self->{socket_io};
165 7 100       97 $socket->close if $socket;
166             }
167              
168             sub post {
169 28     28 1 20055 my($self, $tag, $msg) = @_;
170              
171 28   50     253 $self->_post( $tag || "", $msg, time() );
172             }
173              
174             sub post_with_time {
175 0     0 1 0 my ($self, $tag, $msg, $time) = @_;
176              
177 0   0     0 $self->_post( $tag || "", $msg, $time );
178             }
179              
180             sub _pack_time {
181 28     28   60 my ($self, $time) = @_;
182              
183 28 50       390 if ($self->event_time) {
184 0         0 my $time_i = int $time;
185 0         0 my $nanosec = int(($time - $time_i) * 10 ** 9);
186 0         0 return MP_HEADER_EVENT_TIME . pack("NN", $time_i, $nanosec);
187             } else {
188 28         419 return $self->packer->pack(int $time);
189             }
190             }
191              
192             sub _post {
193 28     28   73 my ($self, $tag, $msg, $time) = @_;
194              
195 28 50       121 if (ref $msg ne "HASH") {
196 0         0 $self->_add_error("message '$msg' must be a HashRef");
197 0         0 return;
198             }
199              
200 28 50       560 $tag = join('.', $self->tag_prefix, $tag) if $self->tag_prefix;
201 28         524 my $p = $self->packer;
202 28         383 $self->_send(
203             $p->pack($tag),
204             $self->_pack_time($time),
205             $p->pack($msg),
206             );
207             }
208              
209             sub _send {
210 28     28   6332 my ($self, @args) = @_;
211              
212 28         45 my ($data, $unique_key);
213 28 50       533 if ( $self->ack ) {
214 0         0 $unique_key = encode_base64(create_uuid(UUID_V4));
215 0         0 $data = join('', MP_HEADER_4ELM_ARRAY, @args, $self->{packer}->pack({ chunk => $unique_key }));
216 0         0 push @{$self->{pending_acks}}, $unique_key;
  0         0  
217             } else {
218 28         11031 $data = join('', MP_HEADER_3ELM_ARRAY, @args);
219             }
220              
221 28         88 my $prev_size = length($self->{pending});
222 28         39 my $current_size = length($data);
223 28         14744 $self->{pending} .= $data;
224              
225 28         62 my $errors = @{ $self->connect_error_history };
  28         847  
226 28 100 100     486 if ( $errors && length $self->pending <= $self->buffer_limit )
227             {
228 16         22404 my $suppress_sec;
229 16 50       33 if ( $errors < RECONNECT_WAIT_MAX_COUNT ) {
230 16         49 $suppress_sec = RECONNECT_WAIT * (RECONNECT_WAIT_INCR_RATE ** ($errors - 1));
231             } else {
232 0         0 $suppress_sec = RECONNECT_WAIT_MAX;
233             }
234 16 50       221 if ( time - $self->connect_error_history->[-1] < $suppress_sec ) {
235 16         2112 return;
236             }
237             }
238              
239 12         33795 my ($written, $error);
240 12         216 for ( 0 .. $self->retry_immediately ) {
241             # check owner pid for fork safe
242 12 100 66     217 if (!$self->socket_io || $self->owner_pid != $$) {
243 10         53 $self->_connect(1);
244             }
245 12         102 eval {
246 12         56 $written = $self->_write( $self->{pending} );
247             my $acked = $self->ack
248 2 50       47 ? $self->_wait_ack(@{ $self->{pending_acks} })
  0         0  
249             : 1;
250 2 50 33     37 if ($written && $acked) {
251 2         8 $self->{pending} = "";
252 2         6 $self->{pending_acks} = [];
253             }
254             };
255 12 100       187 if (!$@) {
256 2         35 return $written;
257             }
258 10         21 my $e = $@;
259 10         26 $error = "Cannot send data: $e";
260 10         29 my $sock = delete $self->{socket_io};
261 10 50       25 $sock->close if $sock;
262 10         15 delete $self->{selector};
263              
264 10 50       168 if ( length($self->{pending}) > $self->buffer_limit ) {
265 10 100       185 if ( defined $self->buffer_overflow_handler ) {
    50          
266 1         9 $self->_call_buffer_overflow_handler();
267 1         2 $self->{pending} = "";
268 1 50       15 $self->{pending_acks} = [] if $self->ack;
269             } elsif ( $self->truncate_buffer_at_overflow ) {
270 9         186 substr($self->{pending}, $prev_size, $current_size, "");
271 9 50       116 pop @{$self->{pending_acks}} if $self->ack;
  0         0  
272             }
273             }
274             }
275              
276 10 50       106 $self->_add_error($error) if defined $error;
277 10         170 return $written;
278             }
279              
280             sub _wait_ack {
281 0     0   0 my $self = shift;
282 0         0 my @acks = @_;
283              
284 0         0 my $up = $self->unpacker;
285 0     0   0 local $SIG{"PIPE"} = sub { die $! };
  0         0  
286             READ:
287 0         0 while (1) {
288 0         0 my ($s) = $self->selector->can_read($self->timeout);
289 0 0       0 if (!$s) {
290 0         0 die "ack read timed out";
291             }
292 0         0 $s->sysread(my $buf, 1024);
293 0 0 0     0 return if @acks > 0 && length($buf) == 0;
294 0         0 $up->feed($buf);
295 0         0 while ($up->next) {
296 0         0 my $ack = $up->data;
297 0         0 my $unique_key = shift @acks;
298 0 0 0     0 if ($unique_key && ref $ack eq "HASH") {
299 0 0       0 if ($ack->{ack} ne $unique_key) {
300 0         0 die "ack is not expected: " . $ack->{ack};
301             }
302             } else {
303 0         0 unshift @{ $self->{pending_acks} }, $unique_key;
  0         0  
304 0         0 die "Can't send data. ack is not expected. $@";
305             }
306 0 0       0 last READ if @acks == 0;
307             }
308             }
309 0         0 return 1;
310             }
311              
312             sub _call_buffer_overflow_handler {
313 3     3   7 my $self = shift;
314 3 100       52 if (my $handler = $self->buffer_overflow_handler) {
315 2         12 eval {
316 2         7 $handler->($self->{pending});
317             };
318 2 50       8 if (my $error = $@) {
319 0         0 $self->_add_error("Can't call buffer overflow handler: $error");
320             }
321             }
322             }
323              
324             sub _write {
325 14     14   29 my $self = shift;
326 14         26140 my $data = shift;
327 14         62 my $length = length($data);
328 14         38 my $retry = my $written = 0;
329 14 100       419 die "Connection is not available" unless $self->socket_io;
330              
331 2     0   81 local $SIG{"PIPE"} = sub { die $! };
  0         0  
332              
333 2         10 while ($written < $length) {
334 2         45 my ($s) = $self->selector->can_write($self->timeout);
335 2 50       117 die "send write timed out" unless $s;
336 2         43 my $nwrite
337             = $s->syswrite($data, $self->write_length, $written);
338              
339 2 50       105 if (!$nwrite) {
340 0 0       0 if ($retry > $self->max_write_retry) {
341 0         0 die "failed write retry; max write retry count. $!";
342             }
343 0         0 $retry++;
344             } else {
345 2         9 $written += $nwrite;
346             }
347             }
348 2         36 $written;
349             }
350              
351             sub DEMOLISH {
352 5     5 0 6394 my $self = shift;
353 5         26 $self->close;
354             }
355              
356              
357             1;
358             __END__