File Coverage

blib/lib/Net/Statsd/Server.pm
Criterion Covered Total %
statement 72 379 19.0
branch 8 92 8.7
condition 4 59 6.7
subroutine 19 47 40.4
pod 0 23 0.0
total 103 600 17.1


line stmt bran cond sub pod time code
1             # ABSTRACT: a Perl port of Etsy's statsd *server*
2              
3             package Net::Statsd::Server;
4              
5             # Use statements {{{
6              
7 3     3   96401 use strict;
  3         6  
  3         563  
8             #se warnings;
9              
10 3     3   4187 use JSON::XS ();
  3         33618  
  3         168  
11 3     3   3616 use Socket qw(SOL_SOCKET SO_RCVBUF);
  3         15771  
  3         1104  
12 3     3   3944 use Time::HiRes ();
  3         8285  
  3         93  
13              
14 3     3   5033 use AnyEvent;
  3         20500  
  3         217  
15 3     3   4662 use AnyEvent::Handle;
  3         61803  
  3         138  
16 3     3   3703 use AnyEvent::Handle::UDP;
  3         399019  
  3         204  
17 3     3   4012 use AnyEvent::Log;
  3         43786  
  3         152  
18 3     3   35 use AnyEvent::Socket;
  3         6  
  3         453  
19              
20 3     3   2292 use Net::Statsd::Server::Backend;
  3         14  
  3         84  
21 3     3   1871 use Net::Statsd::Server::Metrics;
  3         8  
  3         259  
22              
23             # }}}
24              
25             # Constants and global variables {{{
26              
27             use constant {
28 3         16326 DEBUG => 0,
29             DEFAULT_CONFIG_FILE => 'localConfig.js',
30             DEFAULT_FLUSH_INTERVAL => 10_000,
31             DEFAULT_LOG_LEVEL => 'info',
32             RECEIVE_BUFFER_MB => 8, # 0 = setsockopt disabled
33 3     3   25 };
  3         5  
34              
35             our $VERSION = '0.17';
36             our $logger;
37              
38             # }}}
39              
40             sub new {
41 1     1 0 171 my ($class, $opt) = @_;
42 1   50     4 $opt ||= {};
43 1   33     6 $class = ref $class || $class;
44              
45 1         7 my $startup_time = time();
46              
47             # Initialize data structures with defaults for statsd stats
48 1         35 my $self = {
49              
50             startup_time => $startup_time,
51             start_time_hi => [Time::HiRes::gettimeofday],
52              
53             server => undef,
54             mgmtServer => undef,
55              
56             config_file => $opt->{config_file},
57             config => undef,
58             stats => {
59             messages => {
60             "last_msg_seen" => $startup_time,
61             "bad_lines_seen" => 0,
62             }
63             },
64             metrics => undef,
65              
66             debugInt => undef,
67             flushInterval => undef,
68              
69             backends => [],
70             logger => $logger,
71             };
72              
73 1         5 $self->{$_} = $opt->{$_}
74 1         5 for keys %{ $opt };
75              
76 1         4 bless $self, $class;
77             }
78              
79             # Flatten JSON booleans to avoid calls to JSON::XS::bool
80             # in the performance-critical code paths
81             sub _flatten_bools {
82 1     1   2 my ($self, $conf_hash) = @_;
83 1         3 for (qw(dumpMessages debug)) {
84 2         76 $conf_hash->{$_} = !! $conf_hash->{$_};
85             }
86 1         5 return $conf_hash;
87             }
88              
89             sub _json_emitter {
90 0     0   0 my ($self) = @_;
91 0         0 my $js = JSON::XS->new()
92             ->utf8(1)
93             ->shrink(1)
94             ->space_before(0)
95             ->space_after(1)
96             ->indent(0);
97 0         0 return $js;
98             }
99              
100             sub _start_time_hi {
101 0     0   0 return $_[0]->{start_time_hi};
102             }
103              
104             sub config_defaults {
105             return {
106 1     1 0 13 "debug" => 0,
107             "debugInterval" => 10000, # ms
108             "graphitePort" => 2003,
109             "port" => 8125,
110             "address" => "0.0.0.0",
111             "mgmt_port" => 8126,
112             "mgmt_address" => "0.0.0.0",
113             "flushInterval" => DEFAULT_FLUSH_INTERVAL, # ms
114             #"keyFlush" => {
115             # "interval" => 10, # s
116             # "percent" => 100,
117             # "log" => "",
118             #},
119             "log" => {
120             "backend" => "stdout",
121             "level" => "LOG_INFO",
122             },
123              
124             "prefixStats" => "statsd",
125             "dumpMessages" => 0,
126              
127             "deleteIdleStats" => 0,
128             #"deleteCounters" => 0,
129             #"deleteGauges" => 0,
130             #"deleteSets" => 0,
131             #"deleteTimers" => 0,
132              
133             "percentThreshold" => [ 90 ],
134              
135             "backends" => [
136             "Console",
137             ],
138             };
139             }
140              
141             sub config {
142 1     1 0 6 my ($self, $config_file) = @_;
143              
144 1 50 33     10 if (exists $self->{config} && defined $self->{config}) {
145 0         0 return $self->{config};
146             }
147              
148 1   33     7 $config_file ||= $self->config_file();
149              
150 1 50       19 if (! -e $config_file) {
151 0         0 return;
152             }
153              
154 1         4 my $defaults = $self->config_defaults();
155              
156 1 50       30 open my $conf_fh, '<', $config_file
157             or return $defaults;
158              
159 1         22 my $conf_json = join("", <$conf_fh>);
160 1         9 close $conf_fh;
161              
162 1         17 my $json = JSON::XS->new->relaxed->utf8;
163 1         16 my $conf_hash = $json->decode($conf_json);
164              
165 1         3 $conf_hash = $self->_flatten_bools($conf_hash);
166              
167             # Poor man's Hash::Merge
168 1         2 for (keys %{ $defaults }) {
  1         4  
169 14 100       24 if (! exists $conf_hash->{$_}) {
170 4         8 $conf_hash->{$_} = $defaults->{$_};
171             }
172             }
173              
174 1         9 return $self->{config} = $conf_hash;
175             }
176              
177             sub clear_metrics {
178 0     0 0 0 my ($self) = @_;
179              
180 0         0 my $conf = $self->config;
181              
182 0         0 my $del_counters = $conf->{deleteCounters};
183 0         0 my $del_gauges = $conf->{deleteGauges};
184 0         0 my $del_sets = $conf->{deleteSets};
185 0         0 my $del_timers = $conf->{deleteTimers};
186              
187             # Metrics that are not seen in the interval won't
188             # be sent anymore. Enable this with 'deleteIdleStats'
189 0         0 my $del_idle = _defined_or($conf->{deleteIdleStats}, 0);
190              
191 0 0       0 if ($del_idle) {
192 0         0 $del_counters = _defined_or($del_counters, 1);
193 0         0 $del_gauges = _defined_or($del_gauges, 1);
194 0         0 $del_timers = _defined_or($del_timers, 1);
195 0         0 $del_sets = _defined_or($del_sets, 1);
196             }
197              
198             # Whether to just reset them to zero or to wipe them
199 0         0 my $metrics = $self->{metrics};
200 0 0       0 if ($del_counters) {
201 0         0 $metrics->{counters} = {};
202 0         0 $metrics->{counter_rates} = {};
203             }
204             else {
205 0         0 my $counters = $metrics->{counters};
206 0         0 my $counter_rates = $metrics->{counter_rates};
207 0         0 $_ = 0 for
  0         0  
208 0         0 values %{ $counters },
209             values %{ $counter_rates };
210             }
211              
212 0 0       0 if ($del_timers) {
213 0         0 $metrics->{timers} = {};
214 0         0 $metrics->{timer_data} = {};
215             }
216             else {
217 0         0 my $timers = $metrics->{timers};
218 0         0 my $timer_data = $metrics->{timer_data};
219 0         0 $_ = [] for
  0         0  
220 0         0 values %{ $timers },
221             values %{ $timer_data };
222             }
223              
224 0 0       0 if ($del_gauges) {
225 0         0 $metrics->{gauges} = {};
226             }
227              
228 0 0       0 if ($del_sets) {
229 0         0 $metrics->{sets} = {};
230             }
231             else {
232 0         0 my $sets = $metrics->{sets};
233 0         0 $_ = {} for values %{ $sets };
  0         0  
234             }
235              
236 0         0 return;
237             }
238              
239             sub config_file {
240 1     1 0 4 _defined_or($_[0]->{config_file}, DEFAULT_CONFIG_FILE);
241             }
242              
243             sub flush_metrics {
244 0     0 0 0 my ($self) = @_;
245 0         0 my $flush_start_time = time;
246 0         0 $logger->(notice => "flushing metrics");
247 0         0 my $flush_interval = $self->config->{flushInterval};
248 0         0 my $metrics = $self->metrics->process($flush_interval);
249             $self->foreach_backend(sub {
250 0     0   0 $_[0]->flush($flush_start_time, $metrics);
251 0         0 });
252 0         0 $self->clear_metrics();
253 0         0 return;
254             }
255              
256             # This is the performance-critical section of Net::Statsd::Server.
257             # Everything below has been optimised for performance rather than
258             # legibility or transparency. Be careful.
259              
260             sub handle_client_packet {
261 0     0 0 0 my ($self, $request) = @_;
262              
263 0         0 my $config = $self->{config};
264 0         0 my $metrics = $self->{metrics};
265 0         0 my $counters = $metrics->{counters};
266 0         0 my $stats = $self->{stats};
267 0         0 my $g_pref = $config->{prefixStats};
268              
269 0         0 $counters->{"${g_pref}.packets_received"}++;
270              
271             # TODO backendEvents.emit('packet', msg, rinfo);
272              
273 0         0 my @metrics = split("\n", $request);
274              
275 0         0 my $dump_messages = $config->{dumpMessages};
276 0   0     0 my $must_count_keys = exists $config->{keyFlush}
277             && $config->{keyFlush}->{interval};
278              
279 0         0 for my $m (@metrics) {
280              
281 0 0       0 $logger->(debug => $m) if $dump_messages;
282              
283 0         0 my @bits = split(":", $m);
284 0         0 my $key = shift @bits;
285              
286 0         0 $key =~ y{/ }{_-}s;
287 0         0 $key =~ y{a-zA-Z0-9_\-\.}{}cd;
288              
289             # Not very clear here. Etsy's code was doing this differently
290 0 0       0 if ($must_count_keys) {
291 0         0 my $key_counter = $metrics->{keyCounter};
292 0         0 $key_counter->{$key}++;
293             }
294              
295 0 0       0 push @bits, "1" if 0 == @bits;
296              
297 0         0 for my $i (0..$#bits) {
298              
299 0         0 my $sample_rate = 1;
300 0         0 my @fields = split(/\|/, $bits[$i]);
301              
302 0 0 0     0 if (! defined $fields[1] || $fields[1] eq "") {
303 0         0 $logger->(warn => "Bad line: $bits[$i] in msg \"$m\"");
304 0         0 $counters->{"${g_pref}.bad_lines_seen"}++;
305 0         0 $stats->{"messages"}->{"bad_lines_seen"}++;
306 0         0 next;
307             }
308              
309 0   0     0 my $value = $fields[0] || 0;
310 0         0 my $unit = $fields[1];
311 0         0 for ($unit) {
312 0         0 s{^\s*}{};
313 0         0 s{\s*$}{};
314             }
315              
316             # Timers
317 0 0       0 if ($unit eq "ms") {
    0          
    0          
318 0         0 my $timers = $metrics->{timers};
319 0   0     0 $timers->{$key} ||= [];
320 0         0 push @{ $timers->{$key} }, $value;
  0         0  
321             }
322              
323             # Gauges
324             elsif ($unit eq "g") {
325 0         0 my $gauges = $metrics->{gauges};
326 0         0 $gauges->{$key} = $value;
327             }
328              
329             # Sets
330             elsif ($unit eq "s") {
331             # Treat set as a normal hash with undef keys
332             # to minimize memory consumption *and* insertion speed
333 0         0 my $sets = $metrics->{sets};
334 0   0     0 $sets->{$key} ||= {};
335 0         0 $sets->{$key}->{$value} = undef;
336             }
337              
338             # Counters
339             else {
340 0 0       0 if (defined $fields[2]) {
341 0 0       0 if ($fields[2] =~ m{^\@([\d\.]+)}) {
342 0         0 $sample_rate = $1 + 0;
343             }
344             else {
345 0         0 $logger->(warn => "Bad line: $bits[$i] in msg \"$m\"; has invalid sample rate");
346 0         0 $counters->{"${g_pref}.bad_lines_seen"}++;
347 0         0 $stats->{"messages"}->{"bad_lines_seen"}++;
348 0         0 next;
349             }
350             }
351 0   0     0 $counters->{$key} ||= 0;
352 0   0     0 $value ||= 1;
353 0         0 $value /= $sample_rate;
354 0         0 $counters->{$key} += $value;
355             }
356             }
357             }
358              
359 0         0 $stats->{"messages"}->{"last_msg_seen"} = time();
360             }
361              
362             sub handle_manager_command {
363 0     0 0 0 my ($self, $handle, $request) = @_;
364 0         0 my @cmdline = split(" ", trim($request));
365 0         0 my $cmd = shift @cmdline;
366 0         0 my $reply;
367              
368 0         0 $logger->(notice => "Received manager command '$cmd' (req=$request)");
369              
370 0 0       0 if ($cmd eq "help") {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
371 0         0 $reply = (
372             "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\015\012\015\012"
373             );
374             }
375             elsif ($cmd eq "stats") {
376 0         0 my $now = time;
377 0         0 my $uptime = $now - $self->{startup_time};
378 0         0 $reply = "uptime: $uptime\n";
379              
380             # Loop through the base stats
381 0         0 my $stats = $self->stats;
382              
383 0         0 for my $group (keys %{$stats}) {
  0         0  
384 0         0 for my $metric (keys %{$stats->{$group}}) {
  0         0  
385 0         0 my $val = $stats->{$group}->{$metric};
386 0 0       0 my $delta = $metric =~ m{^last_}
387             ? $now - $val
388             : $val;
389 0         0 $reply .= "${group}.${metric}: ${delta}\n";
390             }
391             }
392              
393             $self->foreach_backend(sub {
394 0     0   0 my $backend_status = $_[0]->status;
395 0 0 0     0 if ($backend_status && ref $backend_status eq "HASH") {
396 0         0 for (keys %{ $backend_status }) {
  0         0  
397 0         0 $reply .= sprintf("%s.%s: %s\n",
398             lc($_[0]->name),
399             $_ => $backend_status->{$_}
400             );
401             }
402             }
403 0         0 });
404              
405 0         0 $reply .= "END\n\n";
406             }
407             elsif ($cmd eq "counters") {
408 0         0 my $counters = $self->{metrics}->{counters};
409 0         0 $reply = $self->_json_emitter()->encode($counters);
410 0         0 $reply .= "\nEND\n\n";
411             }
412             elsif ($cmd eq "timers") {
413 0         0 my $timers = $self->{metrics}->{timers};
414 0         0 $reply = $self->_json_emitter()->encode($timers);
415 0         0 $reply .= "\nEND\n\n";
416             }
417             elsif ($cmd eq "gauges") {
418 0         0 my $gauges = $self->{metrics}->{gauges};
419 0         0 $reply = $self->_json_emitter()->encode($gauges);
420 0         0 $reply .= "\nEND\n\n";
421             }
422             elsif ($cmd eq "sets") {
423 0         0 my $sets = $self->{metrics}->{sets};
424 0         0 my $sets_as_lists = {};
425             # FIXME Not really happy about this...
426             # if you have huge sets, it's going to suck.
427             # If you have huge sets, probably statsd is not for you anyway.
428 0         0 for my $set (keys %{$sets}) {
  0         0  
429 0         0 $sets_as_lists->{$set} = [ keys %{ $sets->{$set} } ];
  0         0  
430             }
431 0         0 $reply = $self->_json_emitter()->encode($sets_as_lists);
432 0         0 $reply .= "\nEND\n\n";
433             }
434             elsif ($cmd eq "delcounters") {
435 0         0 my $counters = $self->{metrics}->{counters};
436 0         0 for my $name (@cmdline) {
437 0         0 delete $counters->{$name};
438 0         0 $reply .= "deleted: $name\n";
439             }
440 0         0 $reply .= "\nEND\n\n";
441             }
442             elsif ($cmd eq "deltimers") {
443 0         0 my $timers = $self->{metrics}->{timers};
444 0         0 for my $name (@cmdline) {
445 0         0 delete $timers->{$name};
446 0         0 $reply .= "deleted: $name\n";
447             }
448 0         0 $reply .= "\nEND\n\n";
449             }
450             elsif ($cmd eq "delgauges") {
451 0         0 my $gauges = $self->{metrics}->{gauges};
452 0         0 for my $name (@cmdline) {
453 0         0 delete $gauges->{$name};
454 0         0 $reply .= "deleted: $name\n";
455             }
456 0         0 $reply .= "\nEND\n\n";
457             }
458             elsif ($cmd eq "quit") {
459 0         0 undef $reply;
460 0         0 $handle->destroy();
461             }
462             else {
463 0         0 $reply = "ERROR\n";
464             }
465 0         0 return $reply;
466             }
467              
468             sub handle_manager_connection {
469 0     0 0 0 my ($self, $handle, $line) = @_;
470             #$logger->(notice => "Received mgmt command [$line]");
471 0 0       0 if (my $reply = $self->handle_manager_command($handle, $line)) {
472 0         0 $logger->(notice => "Sending mgmt reply [$reply]");
473 0         0 $handle->push_write($reply);
474             # Accept a new command on the same connection
475             $handle->push_read(line => sub {
476 0     0   0 handle_manager_connection($self, @_)
477 0         0 });
478             }
479             else {
480 0         0 $logger->(notice => "Shutting down socket");
481 0         0 $handle->push_write("\n");
482 0         0 $handle->destroy;
483             }
484             }
485              
486             sub init_backends {
487 0     0 0 0 my ($self) = @_;
488 0         0 my $backends = $self->config->{backends};
489 0 0 0     0 if (! $backends or ref $backends ne 'ARRAY') {
490 0         0 die "At least one backend is needed in your configuration";
491             }
492 0         0 for my $backend (@{ $backends }) {
  0         0  
493              
494             # Nodejs statsd expects a relative path
495 0 0       0 if ($backend =~ m{^ \./backends/ (.+) $}x) {
496 0         0 $backend = $1;
497             }
498              
499 0         0 my $pkg = $backend;
500 0 0       0 if ($backend =~ m{^ (\w+) $}x) {
501 0         0 $pkg = ucfirst lc $pkg;
502 0         0 $pkg = "Net::Statsd::Server::Backend::${pkg}";
503             }
504 0         0 my $mod = $pkg;
505 0         0 $mod =~ s{::}{/}g;
506 0         0 $mod .= ".pm";
507             eval {
508 0         0 require $mod ; 1
  0         0  
509 0 0       0 } or do {
510 0         0 $logger->(error=>"Backend ${backend} failed to load: $@");
511 0         0 next;
512             };
513 0         0 $self->register_backend($pkg);
514             }
515             }
516              
517             sub init_logger {
518 0     0 0 0 my ($self, $config) = @_;
519              
520 0   0     0 $config ||= {};
521              
522 0   0     0 my $backend = $config->{backend} || 'stdout';
523 0   0     0 my $level = lc($config->{level} || 'LOG_INFO');
524 0         0 $level =~ s{^log_}{};
525              
526 0 0       0 if ($backend eq 'stdout') {
    0          
527 0         0 $AnyEvent::Log::FILTER->level($level);
528             }
529             elsif ($backend eq 'syslog') {
530             # Syslog logging works commenting out the FILTER->level line
531 0         0 $AnyEvent::Log::COLLECT->attach(
532             AnyEvent::Log::Ctx->new(
533             level => $level,
534             log_to_syslog => "user",
535             )
536             );
537             }
538 0   0 0   0 $logger ||= sub { AE::log(shift(@_), shift(@_)) };
  0         0  
539             }
540              
541             sub logger {
542 0     0 0 0 return $logger;
543             }
544              
545             sub metrics {
546 0     0 0 0 $_[0]->{metrics};
547             }
548              
549             sub register_backend {
550 0     0 0 0 my ($self, $backend) = @_;
551 0   0     0 $self->{backends} ||= [];
552 0         0 my $backend_instance = $backend->new(
553             $self->_start_time_hi, $self->config,
554             );
555 0         0 $logger->(notice => "Initializing $backend backend");
556 0         0 push @{ $self->{backends} }, $backend_instance;
  0         0  
557             }
558              
559             sub foreach_backend {
560 0     0 0 0 my ($self, $callback) = @_;
561 0   0     0 my $backends = $self->{backends} || [];
562 0         0 for my $obj (@{ $backends }) {
  0         0  
563             eval {
564 0         0 $callback->($obj); 1;
  0         0  
565 0 0       0 } or do {
566 0         0 $logger->(error => "Failed callback on $obj backend: $@");
567             };
568             }
569             }
570              
571             sub reload_config {
572 0     0 0 0 my ($self) = @_;
573 0         0 delete $self->{config};
574 0         0 $logger->(warn => "Received SIGHUP: reloading configuration");
575 0         0 return $self->{config} = $self->config();
576             }
577              
578             sub setup_flush_timer {
579 0     0 0 0 my ($self) = @_;
580              
581 0   0     0 my $flush_interval = $self->config->{flushInterval}
582             || DEFAULT_FLUSH_INTERVAL;
583              
584 0         0 $flush_interval = $flush_interval / 1000;
585 0         0 $logger->(notice => "metrics flush will happen every ${flush_interval}s");
586              
587             my $flush_t = AE::timer $flush_interval, $flush_interval, sub {
588 0     0   0 $self->flush_metrics
589 0         0 };
590              
591 0         0 return $flush_t;
592             }
593              
594 1 50   1   5 sub _defined_or { defined $_[0] ? $_[0] : $_[1] }
595              
596             sub setup_keyflush_timer {
597 0     0 0 0 my ($self) = @_;
598              
599 0         0 my $conf_kf = $self->config->{keyFlush};
600 0         0 my $kf_interval = _defined_or($conf_kf->{interval}, 0);
601 0 0       0 return if $kf_interval <= 0;
602              
603             # Always milliseconds in the config!
604 0         0 $kf_interval /= 1000;
605              
606 0         0 my $kf_pct = _defined_or($conf_kf->{percent}, 100);
607 0         0 my $kf_log = $conf_kf->{log};
608              
609 0   0     0 $logger->(notice => "flushing top ${kf_pct}% keys to "
610             . ($kf_log || "stdout")
611             . " every ${kf_interval}s"
612             );
613              
614             my $kf_timer = AE::timer $kf_interval, $kf_interval, sub {
615 0     0   0 $self->flush_top_keys()
616 0         0 };
617              
618 0         0 return $kf_timer;
619             }
620              
621             sub flush_top_keys {
622 0     0 0 0 my ($self) = @_;
623              
624 0         0 my $conf_kf = _defined_or($self->config->{keyFlush}, {});
625 0         0 my $kf_interval = _defined_or($conf_kf->{interval}, 0);
626 0         0 $kf_interval /= 1000;
627              
628 0   0     0 my $kf_pct = $conf_kf->{percent} || 100;
629 0         0 my $kf_log = $conf_kf->{log};
630              
631 0         0 my @sorted_keys;
632 0         0 my $key_counter = $self->metrics->{keyCounter};
633 0         0 while (my ($k, $v) = each %{ $key_counter }) {
  0         0  
634 0         0 push @sorted_keys, [ $k, $v ];
635             }
636              
637 0         0 @sorted_keys = sort { $b->[1] <=> $a->[1] } @sorted_keys;
  0         0  
638              
639 0         0 my @time = localtime;
640 0         0 my $time_str = sprintf "%04d-%02d-%02d %02d:%02d:%02d",
641             $time[5] + 1900, $time[4] + 1, $time[3],
642             $time[2], $time[1], $time[0];
643              
644 0         0 my $log_message = "";
645              
646             # Only show the top keyFlush.percent keys
647 0         0 my $top_pct_limit = int(scalar(@sorted_keys) * $kf_pct / 100);
648 0         0 for my $i (0 .. $top_pct_limit - 1) {
649 0         0 $log_message .= sprintf "$time_str count=%d key=%s\n",
650             $sorted_keys[$i][1], $sorted_keys[$i][0];
651             }
652              
653 0 0       0 if ($kf_log) {
654 0 0       0 if (open my $log_fh, '>>', $kf_log) {
655 0         0 $log_fh->printflush($log_message);
656 0         0 $log_fh->close();
657             }
658             } else {
659 0         0 print $log_message;
660             }
661              
662             # Clear the counters
663 0         0 $self->metrics->{keyCounter} = {};
664              
665             }
666              
667             sub init_metrics {
668 0     0 0 0 my ($self) = @_;
669 0         0 my $config = $self->config;
670 0         0 $self->{metrics} = Net::Statsd::Server::Metrics->new($config);
671 0         0 return $self->{metrics};
672             }
673              
674             sub start_server {
675 0     0 0 0 my ($self, $config) = @_;
676              
677 0 0       0 if (! defined $config) {
678 0         0 $config = $self->config();
679             }
680              
681 0         0 $self->init_logger($config->{log});
682              
683 0   0     0 my $host = $config->{address} || '0.0.0.0';
684 0   0     0 my $port = $config->{port} || 8125;
685              
686 0   0     0 my $mgmt_host = $config->{mgmt_address} || '0.0.0.0';
687 0   0     0 my $mgmt_port = $config->{mgmt_port} || 8126;
688              
689 0         0 $self->init_backends();
690 0         0 $self->init_metrics();
691              
692             # Statsd clients interface (UDP)
693             $self->{server} = AnyEvent::Handle::UDP->new(
694             bind => [$host, $port],
695             on_recv => sub {
696 0     0   0 my ($data, $ae_handle, $client_addr) = @_;
697             #$logger->(debug => "Got data=$data self=$self");
698 0         0 my $reply = $self->handle_client_packet($data);
699 0         0 $ae_handle->push_send($reply, $client_addr);
700             },
701 0         0 );
702              
703             # Bump up SO_RCVBUF on UDP socket, to buffer up incoming
704             # UDP packets, to avoid significant packet loss under load.
705             # Read more: http://bit.ly/10eeFoE
706 0         0 if (RECEIVE_BUFFER_MB > 0) {
707             # On some systems this could fail (cpantesters reports)
708             # Have it emit a warning instead of throwing an exception
709 0 0       0 setsockopt($self->{server}->fh, SOL_SOCKET,
710             SO_RCVBUF, RECEIVE_BUFFER_MB * 1048576)
711             or warn "Couldn't set SO_RCVBUF: $!";
712             }
713              
714             # Management interface (TCP, for 'stats' command, etc...)
715             $self->{mgmtServer} = AnyEvent::Socket::tcp_server $mgmt_host, $mgmt_port, sub {
716 0 0   0   0 my ($fh, $host, $port) = @_
717             or die "Unable to connect: $!";
718              
719 0         0 my $handle; $handle = AnyEvent::Handle->new(
720             fh => $fh,
721             on_error => sub {
722 0         0 AE::log error => $_[2],
723             $_[0]->destroy;
724             },
725             on_eof => sub {
726 0         0 $handle->destroy;
727 0         0 AE::log info => "Done.",
728             },
729 0         0 );
730              
731             $handle->push_read(line => sub {
732 0         0 handle_manager_connection($self, @_)
733 0         0 });
734 0         0 };
735              
736 0         0 $logger->(notice => "statsd server started on ${host}:${port} (v${VERSION})");
737 0         0 $logger->(notice => "manager interface started on ${mgmt_host}:${mgmt_port}");
738              
739 0         0 my $f_ti = $self->setup_flush_timer;
740 0         0 my $kf_ti = $self->setup_keyflush_timer;
741              
742             # This will block waiting for
743             # incoming connections (TCP) or packets (UDP)
744 0         0 my $cv = AE::cv;
745 0         0 $cv->recv();
746             }
747              
748             sub stats {
749 0     0 0 0 $_[0]->{stats};
750             }
751              
752             sub trim {
753 8     8 0 8581 my $s = shift;
754 8 100       26 return unless defined $s;
755 7         26 $s =~ s{^\s+}{};
756 7         22 $s =~ s{\s+$}{};
757 7         16 return $s;
758             }
759              
760             1;
761              
762             __END__