File Coverage

blib/lib/Net/Statsd/Server.pm
Criterion Covered Total %
statement 72 378 19.0
branch 8 92 8.7
condition 4 59 6.7
subroutine 19 47 40.4
pod 0 23 0.0
total 103 599 17.2


line stmt bran cond sub pod time code
1             # ABSTRACT: a Perl port of Etsy's statsd *server*
2              
3             package Net::Statsd::Server;
4              
5             # Use statements {{{
6              
7 3     3   47379 use strict;
  3         4  
  3         66  
8             #se warnings;
9              
10 3     3   1684 use JSON::XS ();
  3         14163  
  3         69  
11 3     3   1390 use Socket qw(SOL_SOCKET SO_RCVBUF);
  3         9083  
  3         388  
12 3     3   1291 use Time::HiRes ();
  3         2619  
  3         62  
13              
14 3     3   2576 use AnyEvent;
  3         11022  
  3         79  
15 3     3   1819 use AnyEvent::Handle;
  3         31736  
  3         86  
16 3     3   1313 use AnyEvent::Handle::UDP;
  3         151149  
  3         99  
17 3     3   1600 use AnyEvent::Log;
  3         22519  
  3         97  
18 3     3   15 use AnyEvent::Socket;
  3         3  
  3         244  
19              
20 3     3   1096 use Net::Statsd::Server::Backend;
  3         10  
  3         67  
21 3     3   1002 use Net::Statsd::Server::Metrics;
  3         17  
  3         104  
22              
23             # }}}
24              
25             # Constants and global variables {{{
26              
27             use constant {
28 3         9082 DEBUG => 0,
29             DEFAULT_CONFIG_FILE => 'localConfig.js',
30             DEFAULT_FLUSH_INTERVAL => 10_000,
31             DEFAULT_LOG_LEVEL => 'info',
32             RECEIVE_BUFFER_MB => 8, # 0 = setsockopt disabled
33 3     3   14 };
  3         3  
34              
35             our $VERSION = '0.17';
36             our $logger;
37              
38             # }}}
39              
40             sub new {
41 1     1 0 380 my ($class, $opt) = @_;
42 1   50     3 $opt ||= {};
43 1   33     5 $class = ref $class || $class;
44              
45 1         5 my $startup_time = time();
46              
47             # Initialize data structures with defaults for statsd stats
48             my $self = {
49              
50             startup_time => $startup_time,
51             start_time_hi => [Time::HiRes::gettimeofday],
52              
53             server => undef,
54             mgmtServer => undef,
55              
56             config_file => $opt->{config_file},
57 1         16 config => undef,
58             stats => {
59             messages => {
60             "last_msg_seen" => $startup_time,
61             "bad_lines_seen" => 0,
62             }
63             },
64             metrics => undef,
65              
66             debugInt => undef,
67             flushInterval => undef,
68              
69             backends => [],
70             logger => $logger,
71             };
72              
73             $self->{$_} = $opt->{$_}
74 1         2 for keys %{ $opt };
  1         5  
75              
76 1         3 bless $self, $class;
77             }
78              
79             # Flatten JSON booleans to avoid calls to JSON::XS::bool
80             # in the performance-critical code paths
81             sub _flatten_bools {
82 1     1   1 my ($self, $conf_hash) = @_;
83 1         3 for (qw(dumpMessages debug)) {
84 2         34 $conf_hash->{$_} = !! $conf_hash->{$_};
85             }
86 1         5 return $conf_hash;
87             }
88              
89             sub _json_emitter {
90 0     0   0 my ($self) = @_;
91 0         0 my $js = JSON::XS->new()
92             ->utf8(1)
93             ->shrink(1)
94             ->space_before(0)
95             ->space_after(1)
96             ->indent(0);
97 0         0 return $js;
98             }
99              
100             sub _start_time_hi {
101 0     0   0 return $_[0]->{start_time_hi};
102             }
103              
104             sub config_defaults {
105             return {
106 1     1 0 10 "debug" => 0,
107             "debugInterval" => 10000, # ms
108             "graphitePort" => 2003,
109             "port" => 8125,
110             "address" => "0.0.0.0",
111             "mgmt_port" => 8126,
112             "mgmt_address" => "0.0.0.0",
113             "flushInterval" => DEFAULT_FLUSH_INTERVAL, # ms
114             #"keyFlush" => {
115             # "interval" => 10, # s
116             # "percent" => 100,
117             # "log" => "",
118             #},
119             "log" => {
120             "backend" => "stdout",
121             "level" => "LOG_INFO",
122             },
123              
124             "prefixStats" => "statsd",
125             "dumpMessages" => 0,
126              
127             "deleteIdleStats" => 0,
128             #"deleteCounters" => 0,
129             #"deleteGauges" => 0,
130             #"deleteSets" => 0,
131             #"deleteTimers" => 0,
132              
133             "percentThreshold" => [ 90 ],
134              
135             "backends" => [
136             "Console",
137             ],
138             };
139             }
140              
141             sub config {
142 1     1 0 5 my ($self, $config_file) = @_;
143              
144 1 50 33     9 if (exists $self->{config} && defined $self->{config}) {
145 0         0 return $self->{config};
146             }
147              
148 1   33     5 $config_file ||= $self->config_file();
149              
150 1 50       15 if (! -e $config_file) {
151 0         0 return;
152             }
153              
154 1         2 my $defaults = $self->config_defaults();
155              
156 1 50       22 open my $conf_fh, '<', $config_file
157             or return $defaults;
158              
159 1         16 my $conf_json = join("", <$conf_fh>);
160 1         6 close $conf_fh;
161              
162 1         10 my $json = JSON::XS->new->relaxed->utf8;
163 1         10 my $conf_hash = $json->decode($conf_json);
164              
165 1         3 $conf_hash = $self->_flatten_bools($conf_hash);
166              
167             # Poor man's Hash::Merge
168 1         1 for (keys %{ $defaults }) {
  1         4  
169 14 100       19 if (! exists $conf_hash->{$_}) {
170 4         5 $conf_hash->{$_} = $defaults->{$_};
171             }
172             }
173              
174 1         9 return $self->{config} = $conf_hash;
175             }
176              
177             sub clear_metrics {
178 0     0 0 0 my ($self) = @_;
179              
180 0         0 my $conf = $self->config;
181              
182 0         0 my $del_counters = $conf->{deleteCounters};
183 0         0 my $del_gauges = $conf->{deleteGauges};
184 0         0 my $del_sets = $conf->{deleteSets};
185 0         0 my $del_timers = $conf->{deleteTimers};
186              
187             # Metrics that are not seen in the interval won't
188             # be sent anymore. Enable this with 'deleteIdleStats'
189 0         0 my $del_idle = _defined_or($conf->{deleteIdleStats}, 0);
190              
191 0 0       0 if ($del_idle) {
192 0         0 $del_counters = _defined_or($del_counters, 1);
193 0         0 $del_gauges = _defined_or($del_gauges, 1);
194 0         0 $del_timers = _defined_or($del_timers, 1);
195 0         0 $del_sets = _defined_or($del_sets, 1);
196             }
197              
198             # Whether to just reset them to zero or to wipe them
199 0         0 my $metrics = $self->{metrics};
200 0 0       0 if ($del_counters) {
201 0         0 $metrics->{counters} = {};
202 0         0 $metrics->{counter_rates} = {};
203             }
204             else {
205 0         0 my $counters = $metrics->{counters};
206 0         0 my $counter_rates = $metrics->{counter_rates};
207 0         0 $_ = 0 for
208 0         0 values %{ $counters },
209 0         0 values %{ $counter_rates };
210             }
211              
212 0 0       0 if ($del_timers) {
213 0         0 $metrics->{timers} = {};
214 0         0 $metrics->{timer_data} = {};
215             }
216             else {
217 0         0 my $timers = $metrics->{timers};
218 0         0 my $timer_data = $metrics->{timer_data};
219 0         0 $_ = [] for
220 0         0 values %{ $timers },
221 0         0 values %{ $timer_data };
222             }
223              
224 0 0       0 if ($del_gauges) {
225 0         0 $metrics->{gauges} = {};
226             }
227              
228 0 0       0 if ($del_sets) {
229 0         0 $metrics->{sets} = {};
230             }
231             else {
232 0         0 my $sets = $metrics->{sets};
233 0         0 $_ = {} for values %{ $sets };
  0         0  
234             }
235              
236 0         0 return;
237             }
238              
239             sub config_file {
240 1     1 0 4 _defined_or($_[0]->{config_file}, DEFAULT_CONFIG_FILE);
241             }
242              
243             sub flush_metrics {
244 0     0 0 0 my ($self) = @_;
245 0         0 my $flush_start_time = time;
246 0         0 $logger->(notice => "flushing metrics");
247 0         0 my $flush_interval = $self->config->{flushInterval};
248 0         0 my $metrics = $self->metrics->process($flush_interval);
249             $self->foreach_backend(sub {
250 0     0   0 $_[0]->flush($flush_start_time, $metrics);
251 0         0 });
252 0         0 $self->clear_metrics();
253 0         0 return;
254             }
255              
256             # This is the performance-critical section of Net::Statsd::Server.
257             # Everything below has been optimised for performance rather than
258             # legibility or transparency. Be careful.
259              
260             sub handle_client_packet {
261 0     0 0 0 my ($self, $request) = @_;
262              
263 0         0 my $config = $self->{config};
264 0         0 my $metrics = $self->{metrics};
265 0         0 my $counters = $metrics->{counters};
266 0         0 my $stats = $self->{stats};
267 0         0 my $g_pref = $config->{prefixStats};
268              
269 0         0 $counters->{"${g_pref}.packets_received"}++;
270              
271             # TODO backendEvents.emit('packet', msg, rinfo);
272              
273 0         0 my @metrics = split("\n", $request);
274              
275 0         0 my $dump_messages = $config->{dumpMessages};
276             my $must_count_keys = exists $config->{keyFlush}
277 0   0     0 && $config->{keyFlush}->{interval};
278              
279 0         0 for my $m (@metrics) {
280              
281 0 0       0 $logger->(debug => $m) if $dump_messages;
282              
283 0         0 my @bits = split(":", $m);
284 0         0 my $key = shift @bits;
285              
286             # Keep [,=] to allow sending of tags. Handy for Influxdb integration.
287 0         0 $key =~ y{/ }{_-}s;
288 0         0 $key =~ y{a-zA-Z0-9_\-\.,=}{}cd;
289              
290             # Not very clear here. Etsy's code was doing this differently
291 0 0       0 if ($must_count_keys) {
292 0         0 my $key_counter = $metrics->{keyCounter};
293 0         0 $key_counter->{$key}++;
294             }
295              
296 0 0       0 push @bits, "1" if 0 == @bits;
297              
298 0         0 for my $i (0..$#bits) {
299              
300 0         0 my $sample_rate = 1;
301 0         0 my @fields = split(/\|/, $bits[$i]);
302              
303 0 0 0     0 if (! defined $fields[1] || $fields[1] eq "") {
304 0         0 $logger->(warn => "Bad line: $bits[$i] in msg \"$m\"");
305 0         0 $counters->{"${g_pref}.bad_lines_seen"}++;
306 0         0 $stats->{"messages"}->{"bad_lines_seen"}++;
307 0         0 next;
308             }
309              
310 0   0     0 my $value = $fields[0] || 0;
311 0         0 my $unit = $fields[1];
312 0         0 for ($unit) {
313 0         0 s{^\s*}{};
314 0         0 s{\s*$}{};
315             }
316              
317             # Timers
318 0 0       0 if ($unit eq "ms") {
    0          
    0          
319 0         0 my $timers = $metrics->{timers};
320 0   0     0 $timers->{$key} ||= [];
321 0         0 push @{ $timers->{$key} }, $value;
  0         0  
322             }
323              
324             # Gauges
325             elsif ($unit eq "g") {
326 0         0 my $gauges = $metrics->{gauges};
327 0         0 $gauges->{$key} = $value;
328             }
329              
330             # Sets
331             elsif ($unit eq "s") {
332             # Treat set as a normal hash with undef keys
333             # to minimize memory consumption *and* insertion speed
334 0         0 my $sets = $metrics->{sets};
335 0   0     0 $sets->{$key} ||= {};
336 0         0 $sets->{$key}->{$value} = undef;
337             }
338              
339             # Counters
340             else {
341 0 0       0 if (defined $fields[2]) {
342 0 0       0 if ($fields[2] =~ m{^\@([\d\.]+)}) {
343 0         0 $sample_rate = $1 + 0;
344             }
345             else {
346 0         0 $logger->(warn => "Bad line: $bits[$i] in msg \"$m\"; has invalid sample rate");
347 0         0 $counters->{"${g_pref}.bad_lines_seen"}++;
348 0         0 $stats->{"messages"}->{"bad_lines_seen"}++;
349 0         0 next;
350             }
351             }
352 0   0     0 $counters->{$key} ||= 0;
353 0   0     0 $value ||= 1;
354 0         0 $value /= $sample_rate;
355 0         0 $counters->{$key} += $value;
356             }
357             }
358             }
359              
360 0         0 $stats->{"messages"}->{"last_msg_seen"} = time();
361             }
362              
363             sub handle_manager_command {
364 0     0 0 0 my ($self, $handle, $request) = @_;
365 0         0 my @cmdline = split(" ", trim($request));
366 0         0 my $cmd = shift @cmdline;
367 0         0 my $reply;
368              
369 0         0 $logger->(notice => "Received manager command '$cmd' (req=$request)");
370              
371 0 0       0 if ($cmd eq "help") {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
372 0         0 $reply = (
373             "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\015\012\015\012"
374             );
375             }
376             elsif ($cmd eq "stats") {
377 0         0 my $now = time;
378 0         0 my $uptime = $now - $self->{startup_time};
379 0         0 $reply = "uptime: $uptime\n";
380              
381             # Loop through the base stats
382 0         0 my $stats = $self->stats;
383              
384 0         0 for my $group (keys %{$stats}) {
  0         0  
385 0         0 for my $metric (keys %{$stats->{$group}}) {
  0         0  
386 0         0 my $val = $stats->{$group}->{$metric};
387 0 0       0 my $delta = $metric =~ m{^last_}
388             ? $now - $val
389             : $val;
390 0         0 $reply .= "${group}.${metric}: ${delta}\n";
391             }
392             }
393              
394             $self->foreach_backend(sub {
395 0     0   0 my $backend_status = $_[0]->status;
396 0 0 0     0 if ($backend_status && ref $backend_status eq "HASH") {
397 0         0 for (keys %{ $backend_status }) {
  0         0  
398             $reply .= sprintf("%s.%s: %s\n",
399             lc($_[0]->name),
400 0         0 $_ => $backend_status->{$_}
401             );
402             }
403             }
404 0         0 });
405              
406 0         0 $reply .= "END\n\n";
407             }
408             elsif ($cmd eq "counters") {
409 0         0 my $counters = $self->{metrics}->{counters};
410 0         0 $reply = $self->_json_emitter()->encode($counters);
411 0         0 $reply .= "\nEND\n\n";
412             }
413             elsif ($cmd eq "timers") {
414 0         0 my $timers = $self->{metrics}->{timers};
415 0         0 $reply = $self->_json_emitter()->encode($timers);
416 0         0 $reply .= "\nEND\n\n";
417             }
418             elsif ($cmd eq "gauges") {
419 0         0 my $gauges = $self->{metrics}->{gauges};
420 0         0 $reply = $self->_json_emitter()->encode($gauges);
421 0         0 $reply .= "\nEND\n\n";
422             }
423             elsif ($cmd eq "sets") {
424 0         0 my $sets = $self->{metrics}->{sets};
425 0         0 my $sets_as_lists = {};
426             # FIXME Not really happy about this...
427             # if you have huge sets, it's going to suck.
428             # If you have huge sets, probably statsd is not for you anyway.
429 0         0 for my $set (keys %{$sets}) {
  0         0  
430 0         0 $sets_as_lists->{$set} = [ keys %{ $sets->{$set} } ];
  0         0  
431             }
432 0         0 $reply = $self->_json_emitter()->encode($sets_as_lists);
433 0         0 $reply .= "\nEND\n\n";
434             }
435             elsif ($cmd eq "delcounters") {
436 0         0 my $counters = $self->{metrics}->{counters};
437 0         0 for my $name (@cmdline) {
438 0         0 delete $counters->{$name};
439 0         0 $reply .= "deleted: $name\n";
440             }
441 0         0 $reply .= "\nEND\n\n";
442             }
443             elsif ($cmd eq "deltimers") {
444 0         0 my $timers = $self->{metrics}->{timers};
445 0         0 for my $name (@cmdline) {
446 0         0 delete $timers->{$name};
447 0         0 $reply .= "deleted: $name\n";
448             }
449 0         0 $reply .= "\nEND\n\n";
450             }
451             elsif ($cmd eq "delgauges") {
452 0         0 my $gauges = $self->{metrics}->{gauges};
453 0         0 for my $name (@cmdline) {
454 0         0 delete $gauges->{$name};
455 0         0 $reply .= "deleted: $name\n";
456             }
457 0         0 $reply .= "\nEND\n\n";
458             }
459             elsif ($cmd eq "quit") {
460 0         0 undef $reply;
461 0         0 $handle->destroy();
462             }
463             else {
464 0         0 $reply = "ERROR\n";
465             }
466 0         0 return $reply;
467             }
468              
469             sub handle_manager_connection {
470 0     0 0 0 my ($self, $handle, $line) = @_;
471             #$logger->(notice => "Received mgmt command [$line]");
472 0 0       0 if (my $reply = $self->handle_manager_command($handle, $line)) {
473 0         0 $logger->(notice => "Sending mgmt reply [$reply]");
474 0         0 $handle->push_write($reply);
475             # Accept a new command on the same connection
476             $handle->push_read(line => sub {
477 0     0   0 handle_manager_connection($self, @_)
478 0         0 });
479             }
480             else {
481 0         0 $logger->(notice => "Shutting down socket");
482 0         0 $handle->push_write("\n");
483 0         0 $handle->destroy;
484             }
485             }
486              
487             sub init_backends {
488 0     0 0 0 my ($self) = @_;
489 0         0 my $backends = $self->config->{backends};
490 0 0 0     0 if (! $backends or ref $backends ne 'ARRAY') {
491 0         0 die "At least one backend is needed in your configuration";
492             }
493 0         0 for my $backend (@{ $backends }) {
  0         0  
494              
495             # Nodejs statsd expects a relative path
496 0 0       0 if ($backend =~ m{^ \./backends/ (.+) $}x) {
497 0         0 $backend = $1;
498             }
499              
500 0         0 my $pkg = $backend;
501 0 0       0 if ($backend =~ m{^ (\w+) $}x) {
502 0         0 $pkg = ucfirst lc $pkg;
503 0         0 $pkg = "Net::Statsd::Server::Backend::${pkg}";
504             }
505 0         0 my $mod = $pkg;
506 0         0 $mod =~ s{::}{/}g;
507 0         0 $mod .= ".pm";
508             eval {
509 0         0 require $mod ; 1
  0         0  
510 0 0       0 } or do {
511 0         0 $logger->(error=>"Backend ${backend} failed to load: $@");
512 0         0 next;
513             };
514 0         0 $self->register_backend($pkg);
515             }
516             }
517              
518             sub init_logger {
519 0     0 0 0 my ($self, $config) = @_;
520              
521 0   0     0 $config ||= {};
522              
523 0   0     0 my $backend = $config->{backend} || 'stdout';
524 0   0     0 my $level = lc($config->{level} || 'LOG_INFO');
525 0         0 $level =~ s{^log_}{};
526              
527 0 0       0 if ($backend eq 'stdout') {
    0          
528 0         0 $AnyEvent::Log::FILTER->level($level);
529             }
530             elsif ($backend eq 'syslog') {
531             # Syslog logging works commenting out the FILTER->level line
532 0         0 $AnyEvent::Log::COLLECT->attach(
533             AnyEvent::Log::Ctx->new(
534             level => $level,
535             log_to_syslog => "user",
536             )
537             );
538             }
539 0   0 0   0 $logger ||= sub { AE::log(shift(@_), shift(@_)) };
  0         0  
540             }
541              
542             sub logger {
543 0     0 0 0 return $logger;
544             }
545              
546             sub metrics {
547 0     0 0 0 $_[0]->{metrics};
548             }
549              
550             sub register_backend {
551 0     0 0 0 my ($self, $backend) = @_;
552 0   0     0 $self->{backends} ||= [];
553 0         0 my $backend_instance = $backend->new(
554             $self->_start_time_hi, $self->config,
555             );
556 0         0 $logger->(notice => "Initializing $backend backend");
557 0         0 push @{ $self->{backends} }, $backend_instance;
  0         0  
558             }
559              
560             sub foreach_backend {
561 0     0 0 0 my ($self, $callback) = @_;
562 0   0     0 my $backends = $self->{backends} || [];
563 0         0 for my $obj (@{ $backends }) {
  0         0  
564             eval {
565 0         0 $callback->($obj); 1;
  0         0  
566 0 0       0 } or do {
567 0         0 $logger->(error => "Failed callback on $obj backend: $@");
568             };
569             }
570             }
571              
572             sub reload_config {
573 0     0 0 0 my ($self) = @_;
574 0         0 delete $self->{config};
575 0         0 $logger->(warn => "Received SIGHUP: reloading configuration");
576 0         0 return $self->{config} = $self->config();
577             }
578              
579             sub setup_flush_timer {
580 0     0 0 0 my ($self) = @_;
581              
582             my $flush_interval = $self->config->{flushInterval}
583 0   0     0 || DEFAULT_FLUSH_INTERVAL;
584              
585 0         0 $flush_interval = $flush_interval / 1000;
586 0         0 $logger->(notice => "metrics flush will happen every ${flush_interval}s");
587              
588             my $flush_t = AE::timer $flush_interval, $flush_interval, sub {
589 0     0   0 $self->flush_metrics
590 0         0 };
591              
592 0         0 return $flush_t;
593             }
594              
595 1 50   1   4 sub _defined_or { defined $_[0] ? $_[0] : $_[1] }
596              
597             sub setup_keyflush_timer {
598 0     0 0 0 my ($self) = @_;
599              
600 0         0 my $conf_kf = $self->config->{keyFlush};
601 0         0 my $kf_interval = _defined_or($conf_kf->{interval}, 0);
602 0 0       0 return if $kf_interval <= 0;
603              
604             # Always milliseconds in the config!
605 0         0 $kf_interval /= 1000;
606              
607 0         0 my $kf_pct = _defined_or($conf_kf->{percent}, 100);
608 0         0 my $kf_log = $conf_kf->{log};
609              
610 0   0     0 $logger->(notice => "flushing top ${kf_pct}% keys to "
611             . ($kf_log || "stdout")
612             . " every ${kf_interval}s"
613             );
614              
615             my $kf_timer = AE::timer $kf_interval, $kf_interval, sub {
616 0     0   0 $self->flush_top_keys()
617 0         0 };
618              
619 0         0 return $kf_timer;
620             }
621              
622             sub flush_top_keys {
623 0     0 0 0 my ($self) = @_;
624              
625 0         0 my $conf_kf = _defined_or($self->config->{keyFlush}, {});
626 0         0 my $kf_interval = _defined_or($conf_kf->{interval}, 0);
627 0         0 $kf_interval /= 1000;
628              
629 0   0     0 my $kf_pct = $conf_kf->{percent} || 100;
630 0         0 my $kf_log = $conf_kf->{log};
631              
632 0         0 my @sorted_keys;
633 0         0 my $key_counter = $self->metrics->{keyCounter};
634 0         0 while (my ($k, $v) = each %{ $key_counter }) {
  0         0  
635 0         0 push @sorted_keys, [ $k, $v ];
636             }
637              
638 0         0 @sorted_keys = sort { $b->[1] <=> $a->[1] } @sorted_keys;
  0         0  
639              
640 0         0 my @time = localtime;
641 0         0 my $time_str = sprintf "%04d-%02d-%02d %02d:%02d:%02d",
642             $time[5] + 1900, $time[4] + 1, $time[3],
643             $time[2], $time[1], $time[0];
644              
645 0         0 my $log_message = "";
646              
647             # Only show the top keyFlush.percent keys
648 0         0 my $top_pct_limit = int(scalar(@sorted_keys) * $kf_pct / 100);
649 0         0 for my $i (0 .. $top_pct_limit - 1) {
650 0         0 $log_message .= sprintf "$time_str count=%d key=%s\n",
651             $sorted_keys[$i][1], $sorted_keys[$i][0];
652             }
653              
654 0 0       0 if ($kf_log) {
655 0 0       0 if (open my $log_fh, '>>', $kf_log) {
656 0         0 $log_fh->printflush($log_message);
657 0         0 $log_fh->close();
658             }
659             } else {
660 0         0 print $log_message;
661             }
662              
663             # Clear the counters
664 0         0 $self->metrics->{keyCounter} = {};
665              
666             }
667              
668             sub init_metrics {
669 0     0 0 0 my ($self) = @_;
670 0         0 my $config = $self->config;
671 0         0 $self->{metrics} = Net::Statsd::Server::Metrics->new($config);
672 0         0 return $self->{metrics};
673             }
674              
675             sub start_server {
676 0     0 0 0 my ($self, $config) = @_;
677              
678 0 0       0 if (! defined $config) {
679 0         0 $config = $self->config();
680             }
681              
682 0         0 $self->init_logger($config->{log});
683              
684 0   0     0 my $host = $config->{address} || '0.0.0.0';
685 0   0     0 my $port = $config->{port} || 8125;
686              
687 0   0     0 my $mgmt_host = $config->{mgmt_address} || '0.0.0.0';
688 0   0     0 my $mgmt_port = $config->{mgmt_port} || 8126;
689              
690 0         0 $self->init_backends();
691 0         0 $self->init_metrics();
692              
693             # Statsd clients interface (UDP)
694             $self->{server} = AnyEvent::Handle::UDP->new(
695             bind => [$host, $port],
696             on_recv => sub {
697 0     0   0 my ($data, $ae_handle, $client_addr) = @_;
698             #$logger->(debug => "Got data=$data self=$self");
699 0         0 $self->handle_client_packet($data);
700             },
701 0         0 );
702              
703             # Bump up SO_RCVBUF on UDP socket, to buffer up incoming
704             # UDP packets, to avoid significant packet loss under load.
705             # Read more: http://bit.ly/10eeFoE
706 0         0 if (RECEIVE_BUFFER_MB > 0) {
707             # On some systems this could fail (cpantesters reports)
708             # Have it emit a warning instead of throwing an exception
709 0 0       0 setsockopt($self->{server}->fh, SOL_SOCKET,
710             SO_RCVBUF, RECEIVE_BUFFER_MB * 1048576)
711             or warn "Couldn't set SO_RCVBUF: $!";
712             }
713              
714             # Management interface (TCP, for 'stats' command, etc...)
715             $self->{mgmtServer} = AnyEvent::Socket::tcp_server $mgmt_host, $mgmt_port, sub {
716 0 0   0   0 my ($fh, $host, $port) = @_
717             or die "Unable to connect: $!";
718              
719 0         0 my $handle; $handle = AnyEvent::Handle->new(
720             fh => $fh,
721             on_error => sub {
722 0         0 AE::log error => $_[2],
723             $_[0]->destroy;
724             },
725             on_eof => sub {
726 0         0 $handle->destroy;
727 0         0 AE::log info => "Done.",
728             },
729 0         0 );
730              
731             $handle->push_read(line => sub {
732 0         0 handle_manager_connection($self, @_)
733 0         0 });
734 0         0 };
735              
736 0         0 $logger->(notice => "statsd server started on ${host}:${port} (v${VERSION})");
737 0         0 $logger->(notice => "manager interface started on ${mgmt_host}:${mgmt_port}");
738              
739 0         0 my $f_ti = $self->setup_flush_timer;
740 0         0 my $kf_ti = $self->setup_keyflush_timer;
741              
742             # This will block waiting for
743             # incoming connections (TCP) or packets (UDP)
744 0         0 my $cv = AE::cv;
745 0         0 $cv->recv();
746             }
747              
748             sub stats {
749 0     0 0 0 $_[0]->{stats};
750             }
751              
752             sub trim {
753 8     8 0 5248 my $s = shift;
754 8 100       19 return unless defined $s;
755 7         32 $s =~ s{^\s+}{};
756 7         12 $s =~ s{\s+$}{};
757 7         11 return $s;
758             }
759              
760             1;
761              
762             __END__