File Coverage

blib/lib/Net/Graphite.pm
Criterion Covered Total %
statement 65 111 58.5
branch 24 60 40.0
condition 7 17 41.1
subroutine 12 16 75.0
pod 7 9 77.7
total 115 213 53.9


line stmt bran cond sub pod time code
1             package Net::Graphite;
2 4     4   278165 use strict;
  4         36  
  4         119  
3 4     4   20 use warnings;
  4         8  
  4         125  
4 4     4   1993 use Errno qw(EINTR);
  4         5645  
  4         409  
5 4     4   30 use Carp qw/confess/;
  4         7  
  4         171  
6 4     4   2132 use IO::Socket::INET;
  4         78707  
  4         27  
7 4     4   1861 use Scalar::Util qw/reftype/;
  4         10  
  4         5177  
8              
9             $Net::Graphite::VERSION = '0.19';
10              
11             our $TEST = 0; # if true, don't send anything to graphite
12              
13             sub new {
14 6     6 0 2241 my $class = shift;
15 6 100 66     40 my %args = @_ == 1 && ref $_[0] eq 'HASH' ? %{$_[0]} : @_;
  1         6  
16              
17 6         72 return bless {
18             host => '127.0.0.1',
19             port => 2003,
20             fire_and_forget => 0,
21             return_connect_error => 0,
22             proto => 'tcp',
23             timeout => 1,
24             tcp_buffer_size => 64 * 1024,
25             max_retries => 1,
26             # flush_limit
27             # path
28             # transformer
29             %args,
30              
31             # private
32             _flush_buffer => [],
33             # _socket
34             }, $class;
35             }
36              
37             sub send {
38 6     6 1 4449 my $self = shift;
39 6         11 my $value;
40 6 100       23 $value = shift if @_ % 2; # single value passed in
41 6         20 my %args = @_;
42              
43 6 100       19 if ($args{data}) {
44 3   33     19 my $xform = $args{transformer} || $self->transformer;
45 3 50       8 if ($xform) {
46 0         0 push @{$self->{_flush_buffer}}, $xform->($args{data});
  0         0  
47             }
48             else {
49 3 100       12 if (ref $args{data}) {
    50          
50 2         11 my $reftype = reftype $args{data};
51              
52             # default transformers
53 2 50       17 if ($reftype eq 'HASH') {
54             # hash structure from Yves
55 2 100       36 my $start_path = $args{path} ? $args{path} : $self->path;
56 2         3 foreach my $epoch (sort {$a <=> $b} keys %{ $args{data} }) {
  2         10  
  2         13  
57 4         11 $self->_fill_lines_for_epoch($epoch, $args{data}{$epoch}, $start_path);
58             }
59             }
60             # TODO - not sure what structure is most useful;
61             # an aref of [$path, $value, $epoch] seems a bit trivial?
62             # elsif ($reftype eq 'ARRAY') {
63             #
64             # }
65             # TODO
66             # elsif ($reftype eq 'CODE') {
67             # my $iter = $args{data};
68             # while (my $text = $iter->()) {
69             # $plaintext .= $text;
70             # }
71             # }
72             # how about sth of DBI? XML? maybe not
73             else {
74 0         0 confess "Arg 'data' passed to send method is a ref but has no transformer";
75             }
76             }
77             elsif ( length $args{data} ) {
78             # passed plaintext without a transformer
79 1         2 push @{$self->{_flush_buffer}}, $args{data};
  1         4  
80             }
81             else {
82             # Empty request?
83             }
84             }
85             }
86             else {
87 3 100       9 $value = $args{value} unless defined $value;
88 3   66     14 my $path = $args{path} || $self->path;
89 3   66     10 my $time = $args{time} || time;
90              
91 3         6 push @{$self->{_flush_buffer}}, "$path $value $time\n";
  3         14  
92             }
93              
94 6         37 $self->flush();
95              
96             # This join can get somewhat heavy, so don't do it unless explicitly
97             # requested.
98 6 50       26 return join('', @{ $self->{_flush_buffer} })
  6         36  
99             if defined wantarray;
100              
101 0         0 return;
102             }
103              
104             sub flush {
105 6     6 1 14 my ($self) = @_;
106             my $flush_buffer = $self->{_flush_buffer}
107 6 50       27 or return;
108 6 50       31 return unless @$flush_buffer;
109              
110 6 50       20 $self->trace($flush_buffer) if $self->{trace};
111              
112             # Do not do anything if we are just testing
113 6 50       17 return if $Net::Graphite::TEST;
114              
115             # If connection failed we already notified about it elsewhere, so just
116             # return.
117 0 0       0 return unless $self->connect();
118              
119 0   0     0 my $size_limit = $self->{tcp_buffer_size} || 64 * 1024;
120 0         0 my $retries = 0;
121              
122             FLUSH_BUFFER:
123 0         0 while ( @$flush_buffer ) {
124 0         0 my @batch_send = shift @$flush_buffer;
125 0         0 my $batch_size = bytes::length( $batch_send[0] );
126 0         0 while ( @$flush_buffer ) {
127 0         0 my $msg_size = bytes::length( $flush_buffer->[0] );
128              
129 0 0       0 last if $batch_size + $msg_size > $size_limit;
130              
131 0         0 push @batch_send, shift @$flush_buffer;
132             }
133              
134 0         0 my $buf = join '', @batch_send;
135 0         0 while (length($buf)) {
136             # We are using send() here rather than calling a method on the
137             # _socket object to avoid an unnecessary syscall. It would call
138             # getpeername() every single time. It does that do check whether
139             # the fourth argument is needed and we know that for open TCP
140             # sockets it is not.
141 0         0 my $res = CORE::send( $self->{_socket}, $buf, 0 );
142              
143 0 0       0 if (not defined $res) {
144 0 0       0 redo if $! == EINTR;
145              
146             # close/forget the socket, because it is most likely broken in
147             # some way. This will force a re-open on next operation.
148 0         0 delete $self->{_socket};
149              
150             # Bail out early if it was "fire and forget" request; do not
151             # put the unsent data back in the buffer in that case.
152 0 0       0 return if $self->{fire_and_forget};
153              
154             # Put back the unsent data, so we can retry it.
155             # We do not know how much data was actually unprocessed, so
156             # play it safe and put back everything. Normally it is ok to
157             # overwrite a data point in graphite with the same data, so
158             # sending it twice won't be a problem.
159 0         0 unshift @$flush_buffer, @batch_send;
160              
161             # Reconnect and retry
162             confess "Error sending data"
163 0 0       0 if ++$retries > $self->{max_retries};
164              
165 0         0 $self->connect();
166              
167 0         0 redo FLUSH_BUFFER;
168             }
169              
170 0         0 substr($buf, 0, $res, '');
171             }
172 0 0 0     0 if (length($buf) && not $self->{fire_and_forget}) {
173 0         0 confess "Error sending data";
174             }
175             }
176              
177             # On success clear the buffer. The array itself should be empty already,
178             # but shift() won't clear the offset in the underlying data structure,
179             # so the array would potentially keep growing forever.
180 0         0 @$flush_buffer = ();
181              
182 0         0 return;
183             }
184              
185             sub _fill_lines_for_epoch {
186 28     28   51 my ($self, $epoch, $hash, $path) = @_;
187              
188             # still in the "branches"
189 28 100       46 if (ref $hash) {
190 12         32 foreach my $key (sort keys %$hash) {
191 24         37 my $value = $hash->{$key};
192 24         64 $self->_fill_lines_for_epoch($epoch, $value, "$path.$key");
193             }
194             }
195             # reached the "leaf" value
196             else {
197 16         19 push @{ $self->{_flush_buffer} }, "$path $hash $epoch\n";
  16         94  
198             }
199             }
200              
201             sub connect {
202 0     0 1 0 my $self = shift;
203             return $self->{_socket}
204 0 0       0 if $self->{_socket};
205              
206             $self->{_socket} = IO::Socket::INET->new(
207             PeerHost => $self->{host},
208             PeerPort => $self->{port},
209             Proto => $self->{proto},
210             Timeout => $self->{timeout},
211 0         0 );
212              
213 0 0       0 unless ($self->{_socket}) {
214 0 0       0 if ($self->{return_connect_error}) {
    0          
215             # This is probably only used if you call $graphite->connect before ->send
216             # in order to check if there is a connection;
217             # otherwise, it'll just "forget" (without even "firing").
218 0         0 return;
219             }
220             elsif (not $self->{fire_and_forget}) {
221 0         0 confess "Error creating socket: $!";
222             }
223             }
224 0         0 return $self->{_socket};
225             }
226              
227             # if you need to close/flush for some reason
228             sub close {
229 0     0 1 0 my $self = shift;
230 0 0       0 return unless my $socket = delete $self->{_socket};
231 0         0 $socket->close();
232             }
233              
234             sub trace {
235 0     0 0 0 my (undef, $val_line) = @_;
236 0         0 print STDERR $val_line;
237             }
238              
239             ### mutators
240             sub flush_limit {
241 0     0 1 0 my ($self, $limit) = @_;
242 0 0       0 $self->{flush_limit} = $limit if defined $limit;
243 0         0 return $self->{flush_limit};
244             }
245             sub path {
246 3     3 1 9 my ($self, $path) = @_;
247 3 50       8 $self->{path} = $path if defined $path;
248 3         12 return $self->{path};
249             }
250             sub transformer {
251 3     3 1 7 my ($self, $xform) = @_;
252 3 50       8 $self->{transformer} = $xform if defined $xform;
253 3         21 return $self->{transformer};
254             }
255              
256             1;
257             __END__