File Coverage

blib/lib/Net/Statsd/Server/Backend/Graphite.pm
Criterion Covered Total %
statement 145 209 69.3
branch 12 36 33.3
condition 3 9 33.3
subroutine 17 21 80.9
pod 0 11 0.0
total 177 286 61.8


line stmt bran cond sub pod time code
1             #
2             # ABSTRACT: Flushes stats to graphite (http://graphite.wikidot.com/).
3             #
4             # To enable this backend, include 'graphite' in the backends
5             # configuration array:
6             #
7             # backends: ['graphite']
8             #
9             # This backend supports the following config options:
10             #
11             # graphiteHost: Hostname of graphite server.
12             # graphitePort: Port to contact graphite server at.
13             #
14              
15             package Net::Statsd::Server::Backend::Graphite;
16             $Net::Statsd::Server::Backend::Graphite::VERSION = '0.20';
17 2     2   816 use 5.008;
  2         4  
18 2     2   5 use strict;
  2         3  
  2         27  
19 2     2   6 use warnings;
  2         2  
  2         38  
20 2     2   6 use base qw(Net::Statsd::Server::Backend);
  2         1  
  2         453  
21              
22 2     2   551 use AnyEvent::Log;
  2         12761  
  2         55  
23 2     2   7 use Carp ();
  2         2  
  2         24  
24 2     2   878 use IO::Socket::INET ();
  2         25336  
  2         32  
25 2     2   9 use Time::HiRes ();
  2         2  
  2         41  
26              
27             use constant {
28 2         2661 fmt_FLOAT => '%.6f',
29             fmt_INT => '%d',
30             fmt_STR => '%s',
31             fmt_TIME => '%d',
32 2     2   6 };
  2         12  
33              
34 6 100   6   13 sub _dor { defined $_[0] ? $_[0] : $_[1] }
35              
36             sub init {
37 1     1 0 2 my ($self, $startup_time, $config) = @_;
38              
39 1         2 for (qw(debug graphiteHost graphitePort)) {
40 3         7 $self->{$_} = $config->{$_};
41             }
42              
43 1   50     3 $config->{graphite} ||= {};
44              
45 1         4 my $globalPrefix = _dor($config->{graphite}->{globalPrefix} , "stats");
46 1         3 my $prefixCounter = _dor($config->{graphite}->{prefixCounter}, "counters");
47 1         2 my $prefixTimer = _dor($config->{graphite}->{prefixTimer} , "timers");
48 1         2 my $prefixGauge = _dor($config->{graphite}->{prefixGauge} , "gauges");
49 1         2 my $prefixSet = _dor($config->{graphite}->{prefixSet} , "sets");
50 1         2 my $legacyNamespace = _dor($config->{graphite}->{legacyNamespace} , 1);
51              
52 1         1 my $globalNamespace = [];
53 1         1 my $counterNamespace = [];
54 1         1 my $timerNamespace = [];
55 1         1 my $gaugesNamespace = [];
56 1         1 my $setsNamespace = [];
57              
58 1 50       2 if ($legacyNamespace) {
59              
60 0         0 $globalNamespace = ["stats"];
61 0         0 $counterNamespace = ["stats"];
62 0         0 $timerNamespace = ["stats", "timers"];
63 0         0 $gaugesNamespace = ["stats", "gauges"];
64 0         0 $setsNamespace = ["stats", "sets"];
65              
66             }
67             else {
68              
69 1 50       2 if ($globalPrefix ne "") {
70 1         1 push @{ $globalNamespace }, $globalPrefix;
  1         2  
71 1         1 push @{ $counterNamespace }, $globalPrefix;
  1         1  
72 1         1 push @{ $timerNamespace }, $globalPrefix;
  1         1  
73 1         1 push @{ $gaugesNamespace }, $globalPrefix;
  1         2  
74 1         0 push @{ $setsNamespace }, $globalPrefix;
  1         1  
75             }
76              
77 1 50       3 if ($prefixCounter ne "") {
78 1         1 push @{ $counterNamespace }, $prefixCounter;
  1         1  
79             }
80              
81 1 50       2 if ($prefixTimer ne "") {
82 1         1 push @{ $timerNamespace }, $prefixTimer;
  1         1  
83             }
84              
85 1 50       3 if ($prefixGauge ne "") {
86 1         1 push @{ $gaugesNamespace }, $prefixGauge;
  1         0  
87             }
88              
89 1 50       2 if ($prefixSet ne "") {
90 1         1 push @{ $setsNamespace }, $prefixSet;
  1         1  
91             }
92              
93             }
94              
95 1         2 $self->{globalPrefix} = $globalPrefix;
96 1         1 $self->{prefixCounter} = $prefixCounter;
97 1         2 $self->{prefixTimer} = $prefixTimer;
98 1         1 $self->{prefixGauge} = $prefixGauge;
99 1         1 $self->{prefixSet} = $prefixSet;
100 1         1 $self->{legacyNamespace} = $legacyNamespace;
101              
102 1         2 $self->{globalNamespace} = $globalNamespace;
103 1         1 $self->{counterNamespace} = $counterNamespace;
104 1         1 $self->{timerNamespace} = $timerNamespace;
105 1         2 $self->{gaugesNamespace} = $gaugesNamespace;
106 1         1 $self->{setsNamespace} = $setsNamespace;
107              
108             #$self->{graphiteStats}->{last_flush} =
109             #$self->{graphiteStats}->{last_exception} = $startup_time;
110              
111 1         1 $self->{flushInterval} = $config->{flushInterval};
112 1         1 $self->{prefixStats} = $config->{prefixStats};
113              
114 1 50       4 if (! $self->{prefixStats}) {
115 0         0 Carp::croak("config.prefixStats should not be blank/empty!");
116             }
117              
118             }
119              
120             sub flush {
121 0     0 0 0 my ($self, $timestamp, $metrics) = @_;
122 0         0 my $flush_stats = $self->flush_stats($timestamp, $metrics);
123 0         0 $self->post_stats($flush_stats);
124             }
125              
126             sub flush_stats {
127 1     1 0 9 my ($self, $ts, $metrics) = @_;
128              
129 1         3 my $startTime = [ Time::HiRes::gettimeofday ];
130 1         1 my $statString = "";
131 1         1 my $num_stats = 0;
132 1         2 my $timer_data_key;
133              
134 1         1 my $counters = $metrics->{counters};
135 1         1 my $gauges = $metrics->{gauges};
136 1         1 my $timers = $metrics->{timers};
137 1         2 my $sets = $metrics->{sets};
138 1         0 my $counter_rates = $metrics->{counter_rates};
139 1         2 my $timer_data = $metrics->{timer_data};
140 1         1 my $statsd_metrics = $metrics->{statsd_metrics};
141              
142             # Accumulate flush statistics into a list
143 1         2 my @fstats;
144              
145 1         0 for my $key (keys %{ $counters }) {
  1         3  
146              
147 2         2 my @namespace = (@{ $self->{counterNamespace} }, $key);
  2         3  
148 2         4 my $namespace = join(".", @namespace);
149              
150 2         2 my $value = $counters->{$key};
151 2         2 my $valuePerSecond = $counter_rates->{$key}; # pre-calculated "per second" rate
152              
153 2 50       3 if ($self->{legacyNamespace}) {
154 0         0 push @fstats, stat_float($namespace, $valuePerSecond, $ts);
155 0         0 push @fstats, stat_int("stats_counts.$key", $value, $ts);
156             } else {
157 2         5 push @fstats, stat_float("$namespace.rate", $valuePerSecond, $ts);
158 2         3 push @fstats, stat_int("$namespace.count", $value, $ts);
159             }
160              
161 2         4 $num_stats++;
162             }
163              
164 1         1 for my $key (keys %{ $timer_data }) {
  1         2  
165 0 0 0     0 if ($timer_data->{$key} && keys %{ $timer_data->{$key} } > 0) {
  0         0  
166 0         0 for my $timer_data_key (keys %{ $timer_data->{$key} }) {
  0         0  
167 0         0 my @namespace = (@{ $self->{timerNamespace} }, $key);
  0         0  
168 0         0 my $the_key = join(".", @namespace);
169             push @fstats, stat_float(
170             "$the_key.$timer_data_key",
171 0         0 $timer_data->{$key}->{$timer_data_key}, $ts
172             );
173             }
174 0         0 $num_stats++;
175             }
176             }
177              
178 1         1 for my $key (keys %{ $gauges }) {
  1         2  
179 0         0 my @namespace = (@{ $self->{gaugesNamespace} }, $key);
  0         0  
180 0         0 push @fstats, stat_float(join(".", @namespace), $gauges->{$key}, $ts);
181 0         0 $num_stats++;
182             }
183              
184 1         1 for my $key (keys %{ $sets }) {
  1         2  
185 0         0 my @namespace = (@{ $self->{setsNamespace} }, $key);
  0         0  
186 0         0 my $set_count = join(".", @namespace, "count");
187 0         0 my $set_len = scalar keys %{ $sets->{$key} };
  0         0  
188 0         0 push @fstats, stat_int($set_count, $set_len, $ts);
189 0         0 $num_stats++;
190             }
191              
192 1         1 my $g_pref = $self->{prefixStats};
193 1         14 my @namespace = (@{ $self->{globalNamespace} }, $g_pref);
  1         3  
194              
195             # Convert Time::HiRes format (µs) to ms
196 1         2 my $calcTime = 1000 * Time::HiRes::tv_interval($startTime);
197              
198 1 50       9 if ($self->{legacyNamespace}) {
199 0         0 push @fstats, stat_int("${g_pref}.numStats", $num_stats, $ts);
200 0         0 push @fstats, stat_float("stats.${g_pref}.graphiteStats.calculationtime",
201             $calcTime, $ts);
202 0         0 for my $key (keys %{ $statsd_metrics }) {
  0         0  
203 0         0 push @fstats, stat_int("stats.${g_pref}.${key}", $statsd_metrics->{$key}, $ts);
204             }
205             }
206             else {
207 1         2 my $namespace = join(".", @namespace);
208 1         2 push @fstats, stat_int("${namespace}.numStats", $num_stats, $ts);
209 1         3 push @fstats, stat_float(
210             "${namespace}.graphiteStats.calculationtime", $calcTime, $ts);
211 1         2 for my $key (keys %{ $statsd_metrics }) {
  1         2  
212 1         1 my $value = $statsd_metrics->{$key};
213 1         3 push @fstats, stat_str("${namespace}.${key}", $value, $ts);
214             }
215             }
216              
217 1         2 my $global_stats = $self->global_stats();
218 1         1 push @fstats, @{ $global_stats };
  1         2  
219              
220 1         3 return \@fstats;
221             }
222              
223             sub global_stats {
224 1     1 0 1 my ($self) = @_;
225              
226 1         1 my $g_pref = $self->{prefixStats}; # "statsd" by default
227 1 50       3 if (! $g_pref) {
228 0         0 Carp::croak("config.prefixStats is empty or invalid! (global_stats)");
229             }
230              
231 1   50     2 my $last_flush = $self->{lastFlush} || 0;
232 1   50     2 my $last_exception = $self->{lastException} || 0;
233 1         1 my $ts = time();
234              
235 1         1 my @namespace = (@{ $self->{globalNamespace} }, $g_pref, 'graphiteStats');
  1         2  
236 1         2 my $namespace = join(".", @namespace);
237              
238 1         3 my $global_stats = [
239             stat_time("${namespace}.last_exception", $last_exception, $ts),
240             stat_time("${namespace}.last_flush", $last_flush, $ts),
241             ];
242              
243 1         2 return $global_stats;
244             }
245              
246             sub post_stats {
247 0     0 0 0 my ($self, $stat_list) = @_;
248              
249 0 0       0 return if ! $self->{graphiteHost};
250              
251             eval {
252 0         0 my $host = $self->{graphiteHost};
253 0         0 my $port = $self->{graphitePort};
254 0 0       0 my $graphite = IO::Socket::INET->new(
255             PeerHost => $host,
256             PeerPort => $port,
257             ) or die "Can't connect to Graphite on ${host}:${port}: $!";
258              
259 0         0 my $stat_string = $self->stats_to_string($stat_list);
260 0         0 $graphite->send($stat_string);
261 0         0 $graphite->close();
262              
263 0         0 $self->{lastFlush} = [Time::HiRes::gettimeofday];
264             }
265 0 0       0 or do {
266 0 0       0 if ($self->{debug}) {
267             # TODO use logger!
268 0         0 warn("Exception while posting stats to Graphite: $@");
269             }
270 0         0 $self->{lastException} = [Time::HiRes::gettimeofday];
271             };
272              
273             }
274              
275             sub stat_float {
276 3     3 0 3 my ($stat, $val, $ts) = @_;
277             return {
278 3         7 stat => $stat,
279             value => $val,
280             time => $ts,
281             fmt => fmt_FLOAT,
282             };
283             }
284              
285             sub stat_int {
286 3     3 0 3 my ($stat, $val, $ts) = @_;
287             return {
288 3         6 stat => $stat,
289             value => $val,
290             time => $ts,
291             fmt => fmt_INT,
292             };
293             }
294              
295             sub stat_str {
296 1     1 0 1 my ($stat, $val, $ts) = @_;
297             return {
298 1         4 stat => $stat,
299             value => $val,
300             time => $ts,
301             fmt => fmt_STR,
302             };
303             }
304              
305             sub stat_time {
306 2     2 0 2 my ($stat, $val, $ts) = @_;
307             return {
308 2         5 stat => $stat,
309             value => $val,
310             time => $ts,
311             fmt => fmt_TIME,
312             };
313             }
314              
315             sub stats_to_string {
316 0     0 0   my ($self, $stat_list) = @_;
317 0           my $stat_string = "";
318 0           for (@{ $stat_list }) {
  0            
319 0           my $attr = $_;
320 0           my $stat = $attr->{stat};
321 0           my $val = $attr->{value};
322 0 0         next if ! defined $val;
323 0           my $ts = $attr->{time};
324 0 0         my $fmt = exists $attr->{fmt} ? $attr->{fmt} : '%d';
325             #warn "fmt=$fmt stat=$stat val=$val ts=$ts\n";
326 0           $stat_string .= sprintf("%s $fmt %d\n", $stat, $val, $ts);
327             }
328 0           return $stat_string;
329             }
330              
331             sub status {
332 0     0 0   my ($self) = @_;
333             return {
334             last_flush => $self->since($self->{lastFlush}),
335 0           last_exception => $self->since($self->{lastException}),
336             };
337             }
338              
339             1;