File Coverage

blib/lib/Mojo/Graphite/Writer.pm
Criterion Covered Total %
statement 82 95 86.3
branch 18 22 81.8
condition 7 14 50.0
subroutine 13 15 86.6
pod 3 3 100.0
total 123 149 82.5


line stmt bran cond sub pod time code
1             package Mojo::Graphite::Writer;
2              
3 2     2   617750 use Mojo::Base -base;
  2         11  
  2         14  
4              
5 2     2   442 use feature 'current_sub';
  2         5  
  2         58  
6              
7 2     2   11 use Carp ();
  2         8  
  2         49  
8 2     2   537 use Mojo::IOLoop;
  2         156556  
  2         15  
9 2     2   99 use Mojo::Promise;
  2         5  
  2         18  
10              
11 2     2   81 use constant DEBUG => $ENV{MOJO_GRAPHITE_WRITER_DEBUG};
  2         5  
  2         1838  
12              
13             our $VERSION = '0.07';
14             $VERSION = eval $VERSION;
15              
16             has address => sub { Carp::croak 'address is required' };
17             has batch_size => 100;
18             has preprocess => sub { \&_preprocess };
19             has port => 2003;
20              
21             sub close {
22 0     0 1 0 my $self = shift;
23 0         0 my $stream = $self->{stream};
24 0         0 $stream->close;
25 0         0 return $self;
26             }
27              
28             sub connect {
29 5     5 1 4210 my ($self, %args) = @_;
30 5         17 my $p = Mojo::Promise->new;
31              
32             # Fork-safety
33 5 100 66     163 delete @$self{qw(pid stream)} unless ($self->{pid} //= $$) eq $$;
34              
35 5 100       15 if (my $stream = $self->{stream}) {
36 3         6 say STDERR "Reusing existing Graphite connection" if DEBUG;
37 3         7 $p->resolve($stream);
38             } else {
39 2   33     14 $args{address} //= $self->address;
40 2   33     36 $args{port} //= $self->port;
41 2         13 say STDERR "Connecting to Graphite on $args{address}:$args{port}" if DEBUG;
42             Mojo::IOLoop->client(%args, sub {
43 2     2   3483 my (undef, $err, $stream) = @_;
44 2 50       6 if ($err) {
45 0         0 say STDERR "Error opening Graphite socket: $err" if DEBUG;
46 0         0 return $p->reject($err);
47             }
48 2         5 say STDERR "Graphite socket opened" if DEBUG;
49 2         2 $stream->on(write => sub { say STDERR "Writing @{[length $_[1]]} bytes to Graphite" }) if DEBUG;
50             $stream->on(close => sub {
51 0         0 say STDERR "Graphite socket closed" if DEBUG;
52 0         0 delete $self->{stream};
53 2         28 });
54 2         15 $self->{stream} = $stream;
55 2         9 $p->resolve($stream);
56 2         19 });
57             }
58 5         811 return $p;
59             }
60              
61             sub write {
62 3     3 1 8815 my ($self, @metrics) = @_;
63 3         23 my $p = Mojo::Promise->new;
64 3 50       122 if (my $cb = $self->preprocess) {
65 3 100       17 @metrics = map { ref ? $cb->($_) : $_ } @metrics;
  8         26  
66             }
67 3         7 push @{ $self->{queue} }, [\@metrics, $p];
  3         11  
68 3         28 $self->_write;
69 3         247 return $p;
70             }
71              
72             sub _preprocess {
73             # N.B. this function isn't called on raw strings
74             # clone the array contents so as not to modify originals
75 4     4   8 my @metric = @{$_[0]};
  4         9  
76              
77             # default to current time
78 4 100       11 $metric[2] = time unless defined $metric[2];
79              
80             # format tags, append to name
81 4 100 66     12 if ($metric[3] && keys %{$metric[3]}) {
  3         24  
82 2     2   28 no warnings 'uninitialized';
  2         4  
  2         1375  
83             $metric[0] .= ';' . join ';',
84             # clean up invalid characters
85 6         25 map { s/\(|\)//gr }
86 6         18 map { s/\s+/_/gr }
87             # basic structure
88 6         16 map { "$_=$metric[3]{$_}" }
89 3         7 sort keys %{$metric[3]};
  3         14  
90             }
91              
92 4         19 return "$metric[0] $metric[1] $metric[2]";
93             }
94              
95             sub _write {
96 3     3   8 my $self = shift;
97 3 50 50     6 return unless @{ $self->{queue} ||= [] };
  3         13  
98              
99 3 50       9 return if $self->{writing};
100 3         7 $self->{writing} = 1;
101              
102             $self->connect->then(
103             sub {
104 3     3   708 my $stream = shift;
105             my $write = sub {
106 6         1228 my $queue = $self->{queue};
107              
108             # this batch is done
109 6 100       8 unless (@{ $queue->[0][0] }) {
  6         21  
110 3         5 my $item = shift @$queue;
111 3         7 my $p = $item->[1];
112 3         10 $p->resolve;
113             }
114              
115             # queue is empty
116 6 100       191 unless (@$queue) {
117 3         7 $self->{writing} = 0;
118 3         6 return;
119             }
120              
121 3         7 my $string = join '', map { chomp; "$_\n" } splice @{ $queue->[0][0] }, 0, $self->batch_size;
  8         27  
  8         40  
  3         11  
122 3         15 $stream->write($string, __SUB__);
123 3         16 };
124              
125 3         7 $write->();
126             },
127             sub {
128 0     0     my $err = shift;
129 0           $_->[1]->reject($err) for @{ $self->{queue} };
  0            
130 0           $self->{queue} = [];
131 0           $self->{writing} = 0;
132             }
133 3         10 );
134             }
135              
136             1;
137              
138             =head1 NAME
139              
140             Mojo::Graphite::Writer - A non-blocking Graphite metric writer using the Mojo stack
141              
142             =head1 SYNOPSIS
143              
144             my $graphite = Mojo::Graphite::Writer->new(address => 'graphite.myhost.com');
145             my $time = time;
146             $graphite->write(
147             "my.metric.one 1 $time",
148             "my.metric.two 2 $time",
149             ...
150             );
151              
152             # preprocessing
153             $graphite->write(
154             ['my.metric.three', 3],
155             ['my.metric.four', 4, $time],
156             ['my.metric.five', 5, undef, {foo => 'bar'}],
157             ...
158             );
159              
160             =head1 DESCRIPTION
161              
162             L is a non-blocking client for feeding data to the Graphite metrics collector.
163             This simple module is meant to aid in formattting, batching, and queuing writes to the server in a fork-safe way.
164              
165             =head1 ATTRIBUTES
166              
167             L inherits all attributes from L and implements the following new ones.
168              
169             =head2 address
170              
171             Address of the target Graphite server.
172             Required.
173              
174             =head2 batch_size
175              
176             The number of metrics to send in each write batch.
177             Default is 100.
178              
179             =head2 preprocess
180              
181             A callback that is used to process a metric specified as an arrayref, the callback is not called on raw strings.
182             The callback is passed the array reference as its only argument, it should return a string to be written, it need not end with a newline.
183              
184             The default callback expects a metric arrayref to contain a metric name and a value in the first two slots.
185             If the time is not specified in the third slot (or is undef) then the current time will be used.
186             If the fourth slot contains a non-empty hashref then those will be treated as key-value tags.
187             The tags will be cleaned up, removing parenthesis characters and converting spaces to underscores.
188             They will then be formatted by joining keys and values with an equal sign and joined to the metric name with semicolons.
189              
190             Preprocessing can be fully disabled by setting the attribute to a false value.
191             Passing an array reference without a preprocessing callback will probably not do anything useful.
192              
193             =head2 port
194              
195             Port of the target Graphite server.
196             Default is C<2003>.
197              
198             =head1 METHODS
199              
200             L inherits all methods from L and implements the following new ones.
201              
202             =head2 close
203              
204             Close the current connection to L.
205              
206             =head2 connect
207              
208             Open a new connection to L:L using L.
209             Any additional arguments are passed through to that method.
210             Returns a L that resolves with the L object of the connection.
211              
212             Note that if the client is already connected, the promise is resolved again with the same stream and will until that stream is closed.
213             In this way, for simple connections, you may simple call L while for more complex ones, you may open the connction using this method with additional arguments if needed and then call L later.
214              
215             =head2 write
216              
217             Write metrics to the L-ed graphite server.
218             Metrics are queued and written to the server in a non-blocking way, in the order that L is called.
219              
220             Metrics are strings of the form C as documented as L<"the plaintext protocol"|https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol>.
221             Each string is one metric.
222             It will be line-ending normalized, no newline is required.
223              
224             Metrics may also be specified as an array reference.
225             If so they will be preprocessed using the callback in L which will transform it to a string to be written as documented above.
226             Preprocessing occurs immediately during the call to write.
227              
228             Writes are batched in groups of size L.
229             If the writer is not already connected, calling write will implicitly call L.
230              
231             Returns a L that will be resolved when the metrics passed B are written.
232             The promise is rejected if any write in the write queue fails, even if it is not from the write call.
233              
234             =head1 FUTURE WORK
235              
236             Future work may include
237              
238             =over
239              
240             =item *
241              
242             Possibly a blocking api, though this is questionable
243              
244             =back
245              
246             =head1 SEE ALSO
247              
248             =over
249              
250             =item *
251              
252             L
253              
254             =back
255              
256             =head1 THANKS
257              
258             This module's development was sponsored by L.
259              
260             =head1 SOURCE REPOSITORY
261              
262             L
263              
264             =head1 AUTHOR
265              
266             Joel Berger, Ejoel.a.berger@gmail.comE
267              
268             =head1 CONTRIBUTORS
269              
270             None yet.
271              
272             =head1 THANKS
273              
274             Mohammad S Anwar (manwar)
275              
276             =head1 COPYRIGHT AND LICENSE
277              
278             Copyright (C) 2019 by L and L
279              
280             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
281              
282