File Coverage

blib/lib/Net/Async/Statsd/Client.pm
Criterion Covered Total %
statement 44 52 84.6
branch 7 10 70.0
condition 4 8 50.0
subroutine 17 21 80.9
pod 16 16 100.0
total 88 107 82.2


line stmt bran cond sub pod time code
1             package Net::Async::Statsd::Client;
2             $Net::Async::Statsd::Client::VERSION = '0.005';
3 2     2   66743 use strict;
  2         4  
  2         59  
4 2     2   8 use warnings;
  2         3  
  2         56  
5              
6 2     2   794 use parent qw(IO::Async::Notifier);
  2         441  
  2         8  
7              
8             =head1 NAME
9              
10             Net::Async::Statsd::Client - asynchronous API for Etsy's statsd protocol
11              
12             =head1 VERSION
13              
14             version 0.004
15              
16             =head1 SYNOPSIS
17              
18             use Future;
19             use IO::Async::Loop;
20             use Net::Async::Statsd::Client;
21             my $loop = IO::Async::Loop->new;
22             $loop->add(my $statsd = Net::Async::Statsd::Client->new(
23             host => 'localhost',
24             port => 3001,
25             ));
26             # Wait until the stats are written before proceeding
27             Future->needs_all(
28             $statsd->timing(
29             'some.task' => 133,
30             ),
31             $statsd->gauge(
32             'some.value' => 80,
33             )
34             )->get;
35             # Fire-and-forget stat, record 25% of the time:
36             $statsd->increment('startup', 0.25);
37              
38             =head1 DESCRIPTION
39              
40             Provides an asynchronous API for statsd.
41              
42             =head1 METHODS
43              
44             All public methods return a L indicating when the write has completed.
45             Since writes are UDP packets, there is no guarantee that the remote will
46             receive the value, so this is mostly intended as a way to detect when
47             statsd writes are slow.
48              
49             =cut
50              
51             =head2 timing
52              
53             Records timing information in milliseconds. Takes up to three parameters:
54              
55             =over 4
56              
57             =item * $k - the statsd key
58              
59             =item * $v - the elapsed time in milliseconds
60              
61             =item * $rate - optional sampling rate
62              
63             =back
64              
65             Only the integer part of the elapsed time will be sent.
66              
67             Example usage:
68              
69             $statsd->timing('some.key' => $ms, 0.1); # record this 10% of the time
70              
71             Returns a L which will be resolved when the write completes.
72              
73             =cut
74              
75             sub timing {
76 1     1 1 13 my ($self, $k, $v, $rate) = @_;
77              
78 1         4 $self->queue_stat(
79             $k => int($v) . '|ms',
80             $rate
81             );
82             }
83              
84             =head2 gauge
85              
86             Records a current value. Takes up to three parameters:
87              
88             =over 4
89              
90             =item * $k - the statsd key
91              
92             =item * $v - the new value
93              
94             =item * $rate - optional sampling rate
95              
96             =back
97              
98             Only the integer value will be sent.
99              
100             Example usage:
101              
102             $statsd->timing('some.key' => 123);
103              
104             Returns a L which will be resolved when the write completes.
105              
106             =cut
107              
108             sub gauge {
109 1     1 1 27 my ($self, $k, $v, $rate) = @_;
110              
111 1         6 $self->queue_stat(
112             $k => int($v) . '|g',
113             $rate
114             );
115             }
116              
117             =head2 delta
118              
119             Records changed value. Takes up to three parameters:
120              
121             =over 4
122              
123             =item * $k - the statsd key
124              
125             =item * $v - the change (positive or negative)
126              
127             =item * $rate - optional sampling rate
128              
129             =back
130              
131             Values are truncated to integers.
132              
133             Example usage:
134              
135             $statsd->timing('some.key' => -12);
136              
137             Returns a L which will be resolved when the write completes.
138              
139             =cut
140              
141             sub delta {
142 1     1 1 25 my ($self, $k, $v, $rate) = @_;
143              
144 1         5 $self->queue_stat(
145             $k => int($v) . '|c',
146             $rate
147             );
148             }
149              
150             =head2 count
151              
152             Alias for L.
153              
154             =cut
155              
156             # an alias for good measure
157             *count = *delta;
158              
159             =head2 increment
160              
161             Shortcut for L with a value of +1.
162              
163             =cut
164              
165             sub increment {
166 0     0 1 0 my ($self, $k, $rate) = @_;
167              
168 0         0 $self->queue_stat(
169             $k => '1|c',
170             $rate
171             );
172             }
173              
174             =head2 decrement
175              
176             Shortcut for L with a value of -1.
177              
178             =cut
179              
180             sub decrement {
181 0     0 1 0 my ($self, $k, $rate) = @_;
182              
183 0         0 $self->queue_stat(
184             $k => '-1|c',
185             $rate
186             );
187             }
188              
189             =head2 configure
190              
191             Standard L configuration - called on construction or
192             manually when values need updating.
193              
194             Accepts the following named parameters:
195              
196             =over 4
197              
198             =item * host - the host we'll connect to
199              
200             =item * port - the UDP port to send messages to
201              
202             =item * default_rate - default sampling rate when none is provided for a given call
203              
204             =item * prefix - string to prepend to any stats we record
205              
206             =back
207              
208             =cut
209              
210             sub configure {
211 2     2 1 93 my ($self, %args) = @_;
212 2         7 for (qw(port host default_rate prefix)) {
213 8 100       24 $self->{$_} = delete $args{$_} if exists $args{$_};
214             }
215 2         13 $self->SUPER::configure(%args);
216             }
217              
218             =head1 INTERNAL METHODS
219              
220             These methods are used internally, and are documented
221             for completeness. They may be of use when subclassing
222             this module.
223              
224             =cut
225              
226             =head2 queue_stat
227              
228             Queues a statistic for write.
229              
230             =cut
231              
232             sub queue_stat {
233 3     3 1 5 my ($self, $k, $v, $rate) = @_;
234              
235 3   33     14 $rate //= $self->default_rate;
236 3 50       6 return Future->wrap unless $self->sample($rate);
237              
238 3 50       6 $k = $self->{prefix} . '.' . $k if exists $self->{prefix};
239              
240             # Append rate if we're only sampling part of the data
241 3 50       7 $v .= '|@' . $rate if $rate < 1;
242 3         4 my $f;
243             $f = $self->statsd->then(sub {
244             # FIXME Someday IO::Async::Socket may support
245             # Futures for UDP send, update this if/when
246             # that happens.
247 3     3   291 shift->send("$k:$v");
248 3         133 Future->wrap
249 3     3   7 })->on_ready(sub { undef $f });
  3         89  
250             }
251              
252             =head2 sample
253              
254             Applies sampling based on the given rate - returns true if
255             we should record this, false otherwise.
256              
257             =cut
258              
259             sub sample {
260 60003     60003 1 115601 my ($self, $rate) = @_;
261 60003 100       81969 return 1 if rand() <= $rate;
262 32336         26806 return 0;
263             }
264              
265             =head2 default_rate
266              
267             Default sampling rate. Currently 1 if not overidden in constructor or L.
268              
269             =cut
270              
271 3   50 3 1 15 sub default_rate { shift->{default_rate} // 1 }
272              
273             =head2 port
274              
275             Statsd UDP port.
276              
277             =cut
278              
279 1     1 1 7 sub port { shift->{port} }
280              
281             =head2 host
282              
283             Statsd host to connect to.
284              
285             =cut
286              
287 1     1 1 10 sub host { shift->{host} }
288              
289             sub statsd {
290 3     3 1 4 my ($self) = @_;
291 3   66     16 $self->{statsd} ||= do {
292 1         3 $self->connect
293             }
294             }
295              
296             =head2 connect
297              
298             Establishes the underlying UDP socket.
299              
300             =cut
301              
302             sub connect {
303 1     1 1 1 my ($self) = @_;
304             # IO::Async::Loop
305 1         3 $self->loop->connect(
306             family => 'inet',
307             socktype => 'dgram',
308             service => $self->port,
309             host => $self->host,
310             on_socket => $self->curry::on_socket,
311             );
312             }
313              
314             =head2 on_socket
315              
316             Called when the socket is established.
317              
318             =cut
319              
320             sub on_socket {
321 1     1 1 28291 my ($self, $sock) = @_;
322 1         10 $self->debug_printf("UDP socket established: %s", $sock->write_handle->sockhost_service);
323             # FIXME Don't really want this - we're sending only, no bi-directional shenanigans
324             # required, might need to replace ->connect with an IO::Async::Socket for this?
325 1         445 $sock->configure(
326             on_recv => $self->curry::weak::on_recv,
327             on_recv_error => $self->curry::weak::on_recv_error,
328             );
329 1         66 $self->add_child($sock);
330             }
331              
332             =head2 on_recv
333              
334             Called if we receive data.
335              
336             =cut
337              
338             sub on_recv {
339 0     0 1   my ($self, undef, $dgram, $addr) = @_;
340 0           $self->debug_printf("UDP packet [%s] received from %s", $dgram, join ':', $self->loop->resolver->getnameinfo(
341             addr => $addr,
342             numeric => 1,
343             dgram => 1,
344             ));
345             }
346              
347             =head2 on_recv_error
348              
349             Called if we had an error while receiving.
350              
351             =cut
352              
353             sub on_recv_error {
354 0     0 1   my ($self, undef, $err) = @_;
355 0           $self->debug_printf("UDP packet receive error: %s", $err);
356             }
357              
358             1;
359              
360             __END__