File Coverage

blib/lib/Net/Async/Statsd/Client.pm
Criterion Covered Total %
statement 44 52 84.6
branch 7 10 70.0
condition 4 8 50.0
subroutine 17 21 80.9
pod 16 16 100.0
total 88 107 82.2


line stmt bran cond sub pod time code
1             package Net::Async::Statsd::Client;
2             $Net::Async::Statsd::Client::VERSION = '0.004';
3 2     2   172722 use strict;
  2         5  
  2         75  
4 2     2   11 use warnings;
  2         4  
  2         72  
5              
6 2     2   9306 use parent qw(IO::Async::Notifier);
  2         856  
  2         9  
7              
8             =head1 NAME
9              
10             Net::Async::Statsd::Client - asynchronous API for Etsy's statsd protocol
11              
12             =head1 VERSION
13              
14             Version 0.004
15              
16             =head1 SYNOPSIS
17              
18             use Future;
19             use IO::Async::Loop;
20             use Net::Async::Statsd::Client;
21             my $loop = IO::Async::Loop->new;
22             $loop->add(my $statsd = Net::Async::Statsd::Client->new(
23             host => 'localhost',
24             port => 3001,
25             ));
26             # Wait until the stats are written before proceeding
27             Future->needs_all(
28             $statsd->timing(
29             'some.task' => 133,
30             ),
31             $statsd->gauge(
32             'some.value' => 80,
33             )
34             )->get;
35             # Fire-and-forget stat, record 25% of the time:
36             $statsd->increment('startup', 0.25);
37              
38             =head1 DESCRIPTION
39              
40             Provides an asynchronous API for statsd.
41              
42             =head1 METHODS
43              
44             All public methods return a L indicating when the write has completed.
45             Since writes are UDP packets, there is no guarantee that the remote will
46             receive the value, so this is mostly intended as a way to detect when
47             statsd writes are slow.
48              
49             =cut
50              
51             =head2 timing
52              
53             Records timing information in milliseconds. Takes up to three parameters:
54              
55             =over 4
56              
57             =item * $k - the statsd key
58              
59             =item * $v - the elapsed time in milliseconds
60              
61             =item * $rate - optional sampling rate
62              
63             =back
64              
65             Only the integer part of the elapsed time will be sent.
66              
67             Example usage:
68              
69             $statsd->timing('some.key' => $ms, 0.1); # record this 10% of the time
70              
71             Returns a L which will be resolved when the write completes.
72              
73             =cut
74              
75             sub timing {
76 1     1 1 27 my ($self, $k, $v, $rate) = @_;
77              
78 1         6 $self->queue_stat(
79             $k => int($v) . '|ms',
80             $rate
81             );
82             }
83              
84             =head2 gauge
85              
86             Records a current value. Takes up to three parameters:
87              
88             =over 4
89              
90             =item * $k - the statsd key
91              
92             =item * $v - the new value
93              
94             =item * $rate - optional sampling rate
95              
96             =back
97              
98             Only the integer value will be sent.
99              
100             Example usage:
101              
102             $statsd->timing('some.key' => 123);
103              
104             Returns a L which will be resolved when the write completes.
105              
106             =cut
107              
108             sub gauge {
109 1     1 1 36 my ($self, $k, $v, $rate) = @_;
110              
111 1         8 $self->queue_stat(
112             $k => int($v) . '|g',
113             $rate
114             );
115             }
116              
117             =head2 delta
118              
119             Records changed value. Takes up to three parameters:
120              
121             =over 4
122              
123             =item * $k - the statsd key
124              
125             =item * $v - the change (positive or negative)
126              
127             =item * $rate - optional sampling rate
128              
129             =back
130              
131             Values are truncated to integers.
132              
133             Example usage:
134              
135             $statsd->timing('some.key' => -12);
136              
137             Returns a L which will be resolved when the write completes.
138              
139             =cut
140              
141             sub delta {
142 1     1 1 30 my ($self, $k, $v, $rate) = @_;
143              
144 1         5 $self->queue_stat(
145             $k => int($v) . '|c',
146             $rate
147             );
148             }
149              
150             =head2 count
151              
152             Alias for L.
153              
154             =cut
155              
156             # an alias for good measure
157             *count = *delta;
158              
159             =head2 increment
160              
161             Shortcut for L with a value of +1.
162              
163             =cut
164              
165             sub increment {
166 0     0 1 0 my ($self, $k, $rate) = @_;
167              
168 0         0 $self->queue_stat(
169             $k => '1|c',
170             $rate
171             );
172             }
173              
174             =head2 decrement
175              
176             Shortcut for L with a value of -1.
177              
178             =cut
179              
180             sub decrement {
181 0     0 1 0 my ($self, $k, $rate) = @_;
182              
183 0         0 $self->queue_stat(
184             $k => '-1|c',
185             $rate
186             );
187             }
188              
189             =head2 configure
190              
191             Standard L configuration - called on construction or
192             manually when values need updating.
193              
194             Accepts the following named parameters:
195              
196             =over 4
197              
198             =item * host - the host we'll connect to
199              
200             =item * port - the UDP port to send messages to
201              
202             =item * default_rate - default sampling rate when none is provided for a given call
203              
204             =item * prefix - string to prepend to any stats we record
205              
206             =back
207              
208             =cut
209              
210             sub configure {
211 2     2 1 101 my ($self, %args) = @_;
212 2         7 for (qw(port host default_rate prefix)) {
213 8 100       34 $self->{$_} = delete $args{$_} if exists $args{$_};
214             }
215 2         18 $self->SUPER::configure(%args);
216             }
217              
218             =head1 INTERNAL METHODS
219              
220             These methods are used internally, and are documented
221             for completeness. They may be of use when subclassing
222             this module.
223              
224             =cut
225              
226             =head2 queue_stat
227              
228             Queues a statistic for write.
229              
230             =cut
231              
232             sub queue_stat {
233 3     3 1 8 my ($self, $k, $v, $rate) = @_;
234              
235 3   33     19 $rate //= $self->default_rate;
236 3 50       10 return Future->wrap unless $self->sample($rate);
237              
238 3 50       14 $k = $self->{prefix} . '.' . $k if exists $self->{prefix};
239              
240             # Append rate if we're only sampling part of the data
241 3 50       8 $v .= '|@' . $rate if $rate < 1;
242 3         5 my $f;
243             $f = $self->statsd->then(sub {
244             # FIXME Someday IO::Async::Socket may support
245             # Futures for UDP send, update this if/when
246             # that happens.
247 3     3   402 shift->send("$k:$v");
248 3         165 Future->wrap
249 3     3   9 })->on_ready(sub { undef $f });
  3         103  
250             }
251              
252             =head2 sample
253              
254             Applies sampling based on the given rate - returns true if
255             we should record this, false otherwise.
256              
257             =cut
258              
259             sub sample {
260 60003     60003 1 188857 my ($self, $rate) = @_;
261 60003 100       134654 return 1 if rand() <= $rate;
262 32336         50217 return 0;
263             }
264              
265             =head2 default_rate
266              
267             Default sampling rate. Currently 1 if not overidden in constructor or L.
268              
269             =cut
270              
271 3   50 3 1 21 sub default_rate { shift->{default_rate} // 1 }
272              
273             =head2 port
274              
275             Statsd UDP port.
276              
277             =cut
278              
279 1     1 1 16 sub port { shift->{port} }
280              
281             =head2 host
282              
283             Statsd host to connect to.
284             =cut
285              
286 1     1 1 14 sub host { shift->{host} }
287              
288             sub statsd {
289 3     3 1 5 my ($self) = @_;
290 3   66     29 $self->{statsd} ||= do {
291 1         5 $self->connect
292             }
293             }
294              
295             =head2 connect
296              
297             Establishes the underlying UDP socket.
298              
299             =cut
300              
301             sub connect {
302 1     1 1 1 my ($self) = @_;
303             # IO::Async::Loop
304 1         10 $self->loop->connect(
305             family => 'inet',
306             socktype => 'dgram',
307             service => $self->port,
308             host => $self->host,
309             on_socket => $self->curry::on_socket,
310             );
311             }
312              
313             =head2 on_socket
314              
315             Called when the socket is established.
316              
317             =cut
318              
319             sub on_socket {
320 1     1 1 54916 my ($self, $sock) = @_;
321 1         13 $self->debug_printf("UDP socket established: %s", $sock->write_handle->sockhost_service);
322             # FIXME Don't really want this - we're sending only, no bi-directional shenanigans
323             # required, might need to replace ->connect with an IO::Async::Socket for this?
324 1         794 $sock->configure(
325             on_recv => $self->curry::weak::on_recv,
326             on_recv_error => $self->curry::weak::on_recv_error,
327             );
328 1         92 $self->add_child($sock);
329             }
330              
331             =head2 on_recv
332              
333             Called if we receive data.
334              
335             =cut
336              
337             sub on_recv {
338 0     0 1   my ($self, undef, $dgram, $addr) = @_;
339 0           $self->debug_printf("UDP packet [%s] received from %s", $dgram, join ':', $self->loop->resolver->getnameinfo(
340             addr => $addr,
341             numeric => 1,
342             dgram => 1,
343             ));
344             }
345              
346             =head2 on_recv_error
347              
348             Called if we had an error while receiving.
349              
350             =cut
351              
352             sub on_recv_error {
353 0     0 1   my ($self, undef, $err) = @_;
354 0           $self->debug_printf("UDP packet receive error: %s", $err);
355             }
356              
357             1;
358              
359             __END__