File Coverage

blib/lib/Net/Statsd/Server/Backend/Graphite.pm
Criterion Covered Total %
statement 146 210 69.5
branch 12 36 33.3
condition 3 9 33.3
subroutine 17 21 80.9
pod 0 11 0.0
total 178 287 62.0


line stmt bran cond sub pod time code
1             #
2             # ABSTRACT: Flushes stats to graphite (http://graphite.wikidot.com/).
3             #
4             # To enable this backend, include 'graphite' in the backends
5             # configuration array:
6             #
7             # backends: ['graphite']
8             #
9             # This backend supports the following config options:
10             #
11             # graphiteHost: Hostname of graphite server.
12             # graphitePort: Port to contact graphite server at.
13             #
14              
15             package Net::Statsd::Server::Backend::Graphite;
16             {
17             $Net::Statsd::Server::Backend::Graphite::VERSION = '0.17';
18             }
19              
20 2     2   2118 use 5.008;
  2         7  
  2         168  
21 2     2   12 use strict;
  2         3  
  2         74  
22 2     2   10 use warnings;
  2         4  
  2         81  
23 2     2   10 use base qw(Net::Statsd::Server::Backend);
  2         4  
  2         687  
24              
25 2     2   1174 use AnyEvent::Log;
  2         28463  
  2         74  
26 2     2   18 use Carp ();
  2         4  
  2         106  
27 2     2   6712 use IO::Socket::INET ();
  2         52413  
  2         48  
28 2     2   21 use Time::HiRes ();
  2         24  
  2         61  
29              
30             use constant {
31 2         5331 fmt_FLOAT => '%.6f',
32             fmt_INT => '%d',
33             fmt_STR => '%s',
34             fmt_TIME => '%d',
35 2     2   10 };
  2         4  
36              
37 6 100   6   20 sub _dor { defined $_[0] ? $_[0] : $_[1] }
38              
39             sub init {
40 1     1 0 3 my ($self, $startup_time, $config) = @_;
41              
42 1         2 for (qw(debug graphiteHost graphitePort)) {
43 3         13 $self->{$_} = $config->{$_};
44             }
45              
46 1   50     6 $config->{graphite} ||= {};
47              
48 1         6 my $globalPrefix = _dor($config->{graphite}->{globalPrefix} , "stats");
49 1         4 my $prefixCounter = _dor($config->{graphite}->{prefixCounter}, "counters");
50 1         3 my $prefixTimer = _dor($config->{graphite}->{prefixTimer} , "timers");
51 1         3 my $prefixGauge = _dor($config->{graphite}->{prefixGauge} , "gauges");
52 1         3 my $prefixSet = _dor($config->{graphite}->{prefixSet} , "sets");
53 1         3 my $legacyNamespace = _dor($config->{graphite}->{legacyNamespace} , 1);
54              
55 1         3 my $globalNamespace = [];
56 1         2 my $counterNamespace = [];
57 1         4 my $timerNamespace = [];
58 1         2 my $gaugesNamespace = [];
59 1         3 my $setsNamespace = [];
60              
61 1 50       3 if ($legacyNamespace) {
62              
63 0         0 $globalNamespace = ["stats"];
64 0         0 $counterNamespace = ["stats"];
65 0         0 $timerNamespace = ["stats", "timers"];
66 0         0 $gaugesNamespace = ["stats", "gauges"];
67 0         0 $setsNamespace = ["stats", "sets"];
68              
69             }
70             else {
71              
72 1 50       4 if ($globalPrefix ne "") {
73 1         1 push @{ $globalNamespace }, $globalPrefix;
  1         3  
74 1         1 push @{ $counterNamespace }, $globalPrefix;
  1         2  
75 1         2 push @{ $timerNamespace }, $globalPrefix;
  1         2  
76 1         1 push @{ $gaugesNamespace }, $globalPrefix;
  1         2  
77 1         2 push @{ $setsNamespace }, $globalPrefix;
  1         2  
78             }
79              
80 1 50       3 if ($prefixCounter ne "") {
81 1         1 push @{ $counterNamespace }, $prefixCounter;
  1         1  
82             }
83              
84 1 50       4 if ($prefixTimer ne "") {
85 1         1 push @{ $timerNamespace }, $prefixTimer;
  1         2  
86             }
87              
88 1 50       3 if ($prefixGauge ne "") {
89 1         1 push @{ $gaugesNamespace }, $prefixGauge;
  1         2  
90             }
91              
92 1 50       3 if ($prefixSet ne "") {
93 1         1 push @{ $setsNamespace }, $prefixSet;
  1         2  
94             }
95              
96             }
97              
98 1         2 $self->{globalPrefix} = $globalPrefix;
99 1         3 $self->{prefixCounter} = $prefixCounter;
100 1         2 $self->{prefixTimer} = $prefixTimer;
101 1         2 $self->{prefixGauge} = $prefixGauge;
102 1         2 $self->{prefixSet} = $prefixSet;
103 1         1 $self->{legacyNamespace} = $legacyNamespace;
104              
105 1         2 $self->{globalNamespace} = $globalNamespace;
106 1         2 $self->{counterNamespace} = $counterNamespace;
107 1         2 $self->{timerNamespace} = $timerNamespace;
108 1         2 $self->{gaugesNamespace} = $gaugesNamespace;
109 1         2 $self->{setsNamespace} = $setsNamespace;
110              
111             #$self->{graphiteStats}->{last_flush} =
112             #$self->{graphiteStats}->{last_exception} = $startup_time;
113              
114 1         3 $self->{flushInterval} = $config->{flushInterval};
115 1         2 $self->{prefixStats} = $config->{prefixStats};
116              
117 1 50       5 if (! $self->{prefixStats}) {
118 0         0 Carp::croak("config.prefixStats should not be blank/empty!");
119             }
120              
121             }
122              
123             sub flush {
124 0     0 0 0 my ($self, $timestamp, $metrics) = @_;
125 0         0 my $flush_stats = $self->flush_stats($timestamp, $metrics);
126 0         0 $self->post_stats($flush_stats);
127             }
128              
129             sub flush_stats {
130 1     1 0 13 my ($self, $ts, $metrics) = @_;
131              
132 1         4 my $startTime = [ Time::HiRes::gettimeofday ];
133 1         2 my $statString = "";
134 1         2 my $num_stats = 0;
135 1         1 my $timer_data_key;
136              
137 1         2 my $counters = $metrics->{counters};
138 1         2 my $gauges = $metrics->{gauges};
139 1         3 my $timers = $metrics->{timers};
140 1         2 my $sets = $metrics->{sets};
141 1         1 my $counter_rates = $metrics->{counter_rates};
142 1         2 my $timer_data = $metrics->{timer_data};
143 1         2 my $statsd_metrics = $metrics->{statsd_metrics};
144              
145             # Accumulate flush statistics into a list
146 1         1 my @fstats;
147              
148 1         2 for my $key (keys %{ $counters }) {
  1         2  
149              
150 2         3 my @namespace = (@{ $self->{counterNamespace} }, $key);
  2         5  
151 2         6 my $namespace = join(".", @namespace);
152              
153 2         3 my $value = $counters->{$key};
154 2         4 my $valuePerSecond = $counter_rates->{$key}; # pre-calculated "per second" rate
155              
156 2 50       4 if ($self->{legacyNamespace}) {
157 0         0 push @fstats, stat_float($namespace, $valuePerSecond, $ts);
158 0         0 push @fstats, stat_int("stats_counts.$key", $value, $ts);
159             } else {
160 2         7 push @fstats, stat_float("$namespace.rate", $valuePerSecond, $ts);
161 2         7 push @fstats, stat_int("$namespace.count", $value, $ts);
162             }
163              
164 2         6 $num_stats++;
165             }
166              
167 1         2 for my $key (keys %{ $timer_data }) {
  1         3  
168 0 0 0     0 if ($timer_data->{$key} && keys %{ $timer_data->{$key} } > 0) {
  0         0  
169 0         0 for my $timer_data_key (keys %{ $timer_data->{$key} }) {
  0         0  
170 0         0 my @namespace = (@{ $self->{timerNamespace} }, $key);
  0         0  
171 0         0 my $the_key = join(".", @namespace);
172 0         0 push @fstats, stat_float(
173             "$the_key.$timer_data_key",
174             $timer_data->{$key}->{$timer_data_key}, $ts
175             );
176             }
177 0         0 $num_stats++;
178             }
179             }
180              
181 1         1 for my $key (keys %{ $gauges }) {
  1         3  
182 0         0 my @namespace = (@{ $self->{gaugesNamespace} }, $key);
  0         0  
183 0         0 push @fstats, stat_float(join(".", @namespace), $gauges->{$key}, $ts);
184 0         0 $num_stats++;
185             }
186              
187 1         22 for my $key (keys %{ $sets }) {
  1         4  
188 0         0 my @namespace = (@{ $self->{setsNamespace} }, $key);
  0         0  
189 0         0 my $set_count = join(".", @namespace, "count");
190 0         0 my $set_len = scalar keys %{ $sets->{$key} };
  0         0  
191 0         0 push @fstats, stat_int($set_count, $set_len, $ts);
192 0         0 $num_stats++;
193             }
194              
195 1         3 my $g_pref = $self->{prefixStats};
196 1         1 my @namespace = (@{ $self->{globalNamespace} }, $g_pref);
  1         3  
197              
198             # Convert Time::HiRes format (µs) to ms
199 1         3 my $calcTime = 1000 * Time::HiRes::tv_interval($startTime);
200              
201 1 50       13 if ($self->{legacyNamespace}) {
202 0         0 push @fstats, stat_int("${g_pref}.numStats", $num_stats, $ts);
203 0         0 push @fstats, stat_float("stats.${g_pref}.graphiteStats.calculationtime",
204             $calcTime, $ts);
205 0         0 for my $key (keys %{ $statsd_metrics }) {
  0         0  
206 0         0 push @fstats, stat_int("stats.${g_pref}.${key}", $statsd_metrics->{$key}, $ts);
207             }
208             }
209             else {
210 1         3 my $namespace = join(".", @namespace);
211 1         4 push @fstats, stat_int("${namespace}.numStats", $num_stats, $ts);
212 1         3 push @fstats, stat_float(
213             "${namespace}.graphiteStats.calculationtime", $calcTime, $ts);
214 1         2 for my $key (keys %{ $statsd_metrics }) {
  1         3  
215 1         2 my $value = $statsd_metrics->{$key};
216 1         5 push @fstats, stat_str("${namespace}.${key}", $value, $ts);
217             }
218             }
219              
220 1         4 my $global_stats = $self->global_stats();
221 1         1 push @fstats, @{ $global_stats };
  1         3  
222              
223 1         5 return \@fstats;
224             }
225              
226             sub global_stats {
227 1     1 0 1 my ($self) = @_;
228              
229 1         3 my $g_pref = $self->{prefixStats}; # "statsd" by default
230 1 50       3 if (! $g_pref) {
231 0         0 Carp::croak("config.prefixStats is empty or invalid! (global_stats)");
232             }
233              
234 1   50     8 my $last_flush = $self->{lastFlush} || 0;
235 1   50     4 my $last_exception = $self->{lastException} || 0;
236 1         3 my $ts = time();
237              
238 1         2 my @namespace = (@{ $self->{globalNamespace} }, $g_pref, 'graphiteStats');
  1         3  
239 1         2 my $namespace = join(".", @namespace);
240              
241 1         5 my $global_stats = [
242             stat_time("${namespace}.last_exception", $last_exception, $ts),
243             stat_time("${namespace}.last_flush", $last_flush, $ts),
244             ];
245              
246 1         3 return $global_stats;
247             }
248              
249             sub post_stats {
250 0     0 0 0 my ($self, $stat_list) = @_;
251              
252 0 0       0 return if ! $self->{graphiteHost};
253              
254             eval {
255 0         0 my $host = $self->{graphiteHost};
256 0         0 my $port = $self->{graphitePort};
257 0 0       0 my $graphite = IO::Socket::INET->new(
258             PeerHost => $host,
259             PeerPort => $port,
260             ) or die "Can't connect to Graphite on ${host}:${port}: $!";
261              
262 0         0 my $stat_string = $self->stats_to_string($stat_list);
263 0         0 $graphite->send($stat_string);
264 0         0 $graphite->close();
265              
266 0         0 $self->{lastFlush} = [Time::HiRes::gettimeofday];
267             }
268 0 0       0 or do {
269 0 0       0 if ($self->{debug}) {
270             # TODO use logger!
271 0         0 warn("Exception while posting stats to Graphite: $@");
272             }
273 0         0 $self->{lastException} = [Time::HiRes::gettimeofday];
274             };
275              
276             }
277              
278             sub stat_float {
279 3     3 0 6 my ($stat, $val, $ts) = @_;
280             return {
281 3         12 stat => $stat,
282             value => $val,
283             time => $ts,
284             fmt => fmt_FLOAT,
285             };
286             }
287              
288             sub stat_int {
289 3     3 0 4 my ($stat, $val, $ts) = @_;
290             return {
291 3         11 stat => $stat,
292             value => $val,
293             time => $ts,
294             fmt => fmt_INT,
295             };
296             }
297              
298             sub stat_str {
299 1     1 0 2 my ($stat, $val, $ts) = @_;
300             return {
301 1         5 stat => $stat,
302             value => $val,
303             time => $ts,
304             fmt => fmt_STR,
305             };
306             }
307              
308             sub stat_time {
309 2     2 0 3 my ($stat, $val, $ts) = @_;
310             return {
311 2         12 stat => $stat,
312             value => $val,
313             time => $ts,
314             fmt => fmt_TIME,
315             };
316             }
317              
318             sub stats_to_string {
319 0     0 0   my ($self, $stat_list) = @_;
320 0           my $stat_string = "";
321 0           for (@{ $stat_list }) {
  0            
322 0           my $attr = $_;
323 0           my $stat = $attr->{stat};
324 0           my $val = $attr->{value};
325 0 0         next if ! defined $val;
326 0           my $ts = $attr->{time};
327 0 0         my $fmt = exists $attr->{fmt} ? $attr->{fmt} : '%d';
328             #warn "fmt=$fmt stat=$stat val=$val ts=$ts\n";
329 0           $stat_string .= sprintf("%s $fmt %d\n", $stat, $val, $ts);
330             }
331 0           return $stat_string;
332             }
333              
334             sub status {
335 0     0 0   my ($self) = @_;
336             return {
337 0           last_flush => $self->since($self->{lastFlush}),
338             last_exception => $self->since($self->{lastException}),
339             };
340             }
341              
342             1;