File Coverage

blib/lib/Mojo/Graphite/Writer.pm
Criterion Covered Total %
statement 83 96 86.4
branch 18 22 81.8
condition 7 14 50.0
subroutine 14 16 87.5
pod 3 3 100.0
total 125 151 82.7


line stmt bran cond sub pod time code
1             package Mojo::Graphite::Writer;
2              
3 2     2   610793 use Mojo::Base -base;
  2         12  
  2         16  
4              
5 2     2   352 use feature 'current_sub';
  2         4  
  2         55  
6              
7 2     2   13 use Carp ();
  2         4  
  2         31  
8 2     2   547 use Mojo::IOLoop;
  2         153091  
  2         16  
9 2     2   98 use Mojo::Promise;
  2         4  
  2         19  
10              
11 2     2   79 use constant DEBUG => $ENV{MOJO_GRAPHITE_WRITER_DEBUG};
  2         4  
  2         1713  
12              
13             our $VERSION = '0.08';
14             $VERSION = eval $VERSION;
15              
16             has address => sub { Carp::croak 'address is required' };
17             has batch_size => 100;
18             has preprocess => sub { \&_preprocess };
19             has port => 2003;
20              
21             sub close {
22 0     0 1 0 my $self = shift;
23 0         0 my $stream = $self->{stream};
24 0         0 $stream->close;
25 0         0 return $self;
26             }
27              
28             sub connect {
29 5     5 1 4413 my ($self, %args) = @_;
30 5         15 my $p = Mojo::Promise->new;
31              
32             # Fork-safety
33 5 100 66     147 delete @$self{qw(pid stream)} unless ($self->{pid} //= $$) eq $$;
34              
35 5 100       16 if (my $stream = $self->{stream}) {
36 3         5 say STDERR "Reusing existing Graphite connection" if DEBUG;
37 3         8 $p->resolve($stream);
38             } else {
39 2   33     12 $args{address} //= $self->address;
40 2   33     20 $args{port} //= $self->port;
41 2         13 say STDERR "Connecting to Graphite on $args{address}:$args{port}" if DEBUG;
42             Mojo::IOLoop->client(%args, sub {
43 2     2   2584 my (undef, $err, $stream) = @_;
44 2 50       7 if ($err) {
45 0         0 say STDERR "Error opening Graphite socket: $err" if DEBUG;
46 0         0 return $p->reject($err);
47             }
48 2         3 say STDERR "Graphite socket opened" if DEBUG;
49 2         3 $stream->on(write => sub { say STDERR "Writing @{[length $_[1]]} bytes to Graphite" }) if DEBUG;
50             $stream->on(close => sub {
51 0         0 say STDERR "Graphite socket closed" if DEBUG;
52 0         0 delete $self->{stream};
53 2         13 });
54 2         13 $self->{stream} = $stream;
55 2         8 $p->resolve($stream);
56 2         17 });
57             }
58 5         722 return $p;
59             }
60              
61             sub write {
62 3     3 1 7043 my ($self, @metrics) = @_;
63 3         24 my $p = Mojo::Promise->new;
64 3 50       118 if (my $cb = $self->preprocess) {
65             # need parens on ref to disambiguate on older perls
66 3 100       21 @metrics = map { ref() ? $cb->($_) : $_ } @metrics;
  8         26  
67             }
68 3         6 push @{ $self->{queue} }, [\@metrics, $p];
  3         26  
69 3         13 $self->_write;
70 3         287 return $p;
71             }
72              
73             sub _preprocess {
74             # N.B. this function isn't called on raw strings
75             # clone the array contents so as not to modify originals
76 4     4   8 my @metric = @{$_[0]};
  4         10  
77              
78             # default to current time
79 4 100       12 $metric[2] = _time() unless defined $metric[2];
80              
81             # format tags, append to name
82 4 100 66     13 if ($metric[3] && keys %{$metric[3]}) {
  3         12  
83 2     2   23 no warnings 'uninitialized';
  2         5  
  2         572  
84             $metric[0] .= ';' . join ';',
85             # clean up invalid characters
86 6         26 map { s/\(|\)//gr }
87 6         18 map { s/\s+/_/gr }
88             # basic structure
89 6         17 map { "$_=$metric[3]{$_}" }
90 3         7 sort keys %{$metric[3]};
  3         11  
91             }
92              
93 4         18 return "$metric[0] $metric[1] $metric[2]";
94             }
95              
96             # better overridablility in testing, yes I hate that but ...
97 2     2   937 BEGIN { *_time = \&CORE::time }
98              
99             sub _write {
100 3     3   8 my $self = shift;
101 3 50 50     5 return unless @{ $self->{queue} ||= [] };
  3         13  
102              
103 3 50       9 return if $self->{writing};
104 3         7 $self->{writing} = 1;
105              
106             $self->connect->then(
107             sub {
108 3     3   1367 my $stream = shift;
109             my $write = sub {
110 6         1223 my $queue = $self->{queue};
111              
112             # this batch is done
113 6 100       10 unless (@{ $queue->[0][0] }) {
  6         20  
114 3         8 my $item = shift @$queue;
115 3         6 my $p = $item->[1];
116 3         18 $p->resolve;
117             }
118              
119             # queue is empty
120 6 100       192 unless (@$queue) {
121 3         7 $self->{writing} = 0;
122 3         7 return;
123             }
124              
125 3         7 my $string = join '', map { chomp; "$_\n" } splice @{ $queue->[0][0] }, 0, $self->batch_size;
  8         28  
  8         25  
  3         11  
126 3         16 $stream->write($string, __SUB__);
127 3         17 };
128              
129 3         8 $write->();
130             },
131             sub {
132 0     0     my $err = shift;
133 0           $_->[1]->reject($err) for @{ $self->{queue} };
  0            
134 0           $self->{queue} = [];
135 0           $self->{writing} = 0;
136             }
137 3         8 );
138             }
139              
140             1;
141              
142             =head1 NAME
143              
144             Mojo::Graphite::Writer - A non-blocking Graphite metric writer using the Mojo stack
145              
146             =head1 SYNOPSIS
147              
148             my $graphite = Mojo::Graphite::Writer->new(address => 'graphite.myhost.com');
149             my $time = time;
150             $graphite->write(
151             "my.metric.one 1 $time",
152             "my.metric.two 2 $time",
153             ...
154             );
155              
156             # preprocessing
157             $graphite->write(
158             ['my.metric.three', 3],
159             ['my.metric.four', 4, $time],
160             ['my.metric.five', 5, undef, {foo => 'bar'}],
161             ...
162             );
163              
164             =head1 DESCRIPTION
165              
166             L is a non-blocking client for feeding data to the Graphite metrics collector.
167             This simple module is meant to aid in formattting, batching, and queuing writes to the server in a fork-safe way.
168              
169             =head1 ATTRIBUTES
170              
171             L inherits all attributes from L and implements the following new ones.
172              
173             =head2 address
174              
175             Address of the target Graphite server.
176             Required.
177              
178             =head2 batch_size
179              
180             The number of metrics to send in each write batch.
181             Default is 100.
182              
183             =head2 preprocess
184              
185             A callback that is used to process a metric specified as an arrayref, the callback is not called on raw strings.
186             The callback is passed the array reference as its only argument, it should return a string to be written, it need not end with a newline.
187              
188             The default callback expects a metric arrayref to contain a metric name and a value in the first two slots.
189             If the time is not specified in the third slot (or is undef) then the current time will be used.
190             If the fourth slot contains a non-empty hashref then those will be treated as key-value tags.
191             The tags will be cleaned up, removing parenthesis characters and converting spaces to underscores.
192             They will then be formatted by joining keys and values with an equal sign and joined to the metric name with semicolons.
193              
194             Preprocessing can be fully disabled by setting the attribute to a false value.
195             Passing an array reference without a preprocessing callback will probably not do anything useful.
196              
197             =head2 port
198              
199             Port of the target Graphite server.
200             Default is C<2003>.
201              
202             =head1 METHODS
203              
204             L inherits all methods from L and implements the following new ones.
205              
206             =head2 close
207              
208             Close the current connection to L.
209              
210             =head2 connect
211              
212             Open a new connection to L:L using L.
213             Any additional arguments are passed through to that method.
214             Returns a L that resolves with the L object of the connection.
215              
216             Note that if the client is already connected, the promise is resolved again with the same stream and will until that stream is closed.
217             In this way, for simple connections, you may simple call L while for more complex ones, you may open the connction using this method with additional arguments if needed and then call L later.
218              
219             =head2 write
220              
221             Write metrics to the L-ed graphite server.
222             Metrics are queued and written to the server in a non-blocking way, in the order that L is called.
223              
224             Metrics are strings of the form C as documented as L<"the plaintext protocol"|https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol>.
225             Each string is one metric.
226             It will be line-ending normalized, no newline is required.
227              
228             Metrics may also be specified as an array reference.
229             If so they will be preprocessed using the callback in L which will transform it to a string to be written as documented above.
230             Preprocessing occurs immediately during the call to write.
231              
232             Writes are batched in groups of size L.
233             If the writer is not already connected, calling write will implicitly call L.
234              
235             Returns a L that will be resolved when the metrics passed B are written.
236             The promise is rejected if any write in the write queue fails, even if it is not from the write call.
237              
238             =head1 FUTURE WORK
239              
240             Future work may include
241              
242             =over
243              
244             =item *
245              
246             Possibly a blocking api, though this is questionable
247              
248             =back
249              
250             =head1 SEE ALSO
251              
252             =over
253              
254             =item *
255              
256             L
257              
258             =back
259              
260             =head1 THANKS
261              
262             This module's development was sponsored by L.
263              
264             =head1 SOURCE REPOSITORY
265              
266             L
267              
268             =head1 AUTHOR
269              
270             Joel Berger, Ejoel.a.berger@gmail.comE
271              
272             =head1 CONTRIBUTORS
273              
274             None yet.
275              
276             =head1 THANKS
277              
278             Mohammad S Anwar (manwar)
279              
280             =head1 COPYRIGHT AND LICENSE
281              
282             Copyright (C) 2019 by L and L
283              
284             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
285              
286