File Coverage

blib/lib/Mojo/Graphite/Writer.pm
Criterion Covered Total %
statement 64 77 83.1
branch 9 12 75.0
condition 3 8 37.5
subroutine 11 13 84.6
pod 3 3 100.0
total 90 113 79.6


line stmt bran cond sub pod time code
1             package Mojo::Graphite::Writer;
2              
3 2     2   605253 use Mojo::Base -base;
  2         12  
  2         15  
4              
5 2     2   372 use feature 'current_sub';
  2         7  
  2         55  
6              
7 2     2   22 use Carp ();
  2         5  
  2         30  
8 2     2   527 use Mojo::IOLoop;
  2         151116  
  2         16  
9 2     2   90 use Mojo::Promise;
  2         4  
  2         19  
10              
11 2     2   99 use constant DEBUG => $ENV{MOJO_GRAPHITE_WRITER_DEBUG};
  2         4  
  2         2276  
12              
13             our $VERSION = '0.05';
14             $VERSION = eval $VERSION;
15              
16             has address => sub { Carp::croak 'address is required' };
17             has batch_size => 100;
18             has port => 2003;
19              
20             sub close {
21 0     0 1 0 my $self = shift;
22 0         0 my $stream = $self->{stream};
23 0         0 $stream->close;
24 0         0 return $self;
25             }
26              
27             sub connect {
28 2     2 1 5 my ($self, %args) = @_;
29 2         11 my $p = Mojo::Promise->new;
30 2 100       52 if (my $stream = $self->{stream}) {
31 1         3 say STDERR "Reusing existing Graphite connection" if DEBUG;
32 1         4 $p->resolve($stream);
33             } else {
34 1   33     8 $args{address} //= $self->address;
35 1   33     12 $args{port} //= $self->port;
36 1         6 say STDERR "Connecting to Graphite on $args{address}:$args{port}" if DEBUG;
37             Mojo::IOLoop->client(%args, sub {
38 1     1   1554 my (undef, $err, $stream) = @_;
39 1 50       4 if ($err) {
40 0         0 say STDERR "Error opening Graphite socket: $err" if DEBUG;
41 0         0 return $p->reject($err);
42             }
43 1         2 say STDERR "Graphite socket opened" if DEBUG;
44 1         2 $stream->on(write => sub { say STDERR "Writing @{[length $_[1]]} bytes to Graphite" }) if DEBUG;
45             $stream->on(close => sub {
46 0         0 say STDERR "Graphite socket closed" if DEBUG;
47 0         0 delete $self->{stream};
48 1         9 });
49 1         6 $self->{stream} = $stream;
50 1         6 $p->resolve($stream);
51 1         22 });
52             }
53 2         405 return $p;
54             }
55              
56             sub write {
57 2     2 1 4150 my ($self, @metrics) = @_;
58 2         15 my $p = Mojo::Promise->new;
59 2         92 push @{ $self->{queue} }, [\@metrics, $p];
  2         9  
60 2         9 $self->_write;
61 2         184 return $p;
62             }
63              
64             sub _write {
65 2     2   3 my $self = shift;
66 2 50 50     4 return unless @{ $self->{queue} ||= [] };
  2         11  
67              
68 2 50       7 return if $self->{writing};
69 2         3 $self->{writing} = 1;
70              
71             $self->connect->then(
72             sub {
73 2     2   1038 my $stream = shift;
74             my $write = sub {
75 4         865 my $queue = $self->{queue};
76              
77             # this batch is done
78 4 100       6 unless (@{ $queue->[0][0] }) {
  4         13  
79 2         5 my $item = shift @$queue;
80 2         6 my $p = $item->[1];
81 2         5 $p->resolve;
82             }
83              
84             # queue is empty
85 4 100       127 unless (@$queue) {
86 2         3 $self->{writing} = 0;
87 2         8 return;
88             }
89              
90 2         36 my $string = join '', map { chomp; "$_\n" } splice @{ $queue->[0][0] }, 0, $self->batch_size;
  4         17  
  4         16  
  2         11  
91 2         10 $stream->write($string, __SUB__);
92 2         12 };
93              
94 2         6 $write->();
95             },
96             sub {
97 0     0     my $err = shift;
98 0           $_->[1]->reject($err) for @{ $self->{queue} };
  0            
99 0           $self->{queue} = [];
100 0           $self->{writing} = 0;
101             }
102 2         6 );
103             }
104              
105             1;
106              
107             =head1 NAME
108              
109             Mojo::Graphite::Writer - A non-blocking Graphite metric writer using the Mojo stack
110              
111             =head1 SYNOPSIS
112              
113             my $graphite = Mojo::Graphite::Writer->new(address => 'graphite.myhost.com');
114             my $time = time;
115             $graphite->write(
116             "my.metric.one 1 $time",
117             "my.metric.two 2 $time",
118             ...
119             );
120              
121             =head1 DESCRIPTION
122              
123             L is a non-blocking client for feeding data to the Graphite metrics collector.
124             This simple module is meant to aid in batching and queuing writes to the server.
125              
126             This is still a work-in-progress, however the author uses it in work applications so every effort will be made to keep the api reasonably stable while improving where possible.
127              
128             =head1 ATTRIBUTES
129              
130             L inherits all attributes from L and implements the following new ones.
131              
132             =head2 address
133              
134             Address of the target Graphite server.
135             Required.
136              
137             =head2 batch_size
138              
139             The number of metrics to send in each write batch.
140             Default is 100.
141              
142             =head2 port
143              
144             Port of the target Graphite server.
145             Default is C<2003>.
146              
147             =head1 METHODS
148              
149             L inherits all methods from L and implements the following new ones.
150              
151             =head2 close
152              
153             Close the current connection to L.
154              
155             =head2 connect
156              
157             Open a new connection to L:L using L.
158             Any additional arguments are passed through to that method.
159             Returns a L that resolves with the L object of the connection.
160              
161             Note that if the client is already connected, the promise is resolved again with the same stream and will until that stream is closed.
162             In this way, for simple connections, you may simple call L while for more complex ones, you may open the connction using this method with additional arguments if needed and then call L later.
163              
164             =head2 write
165              
166             Write metrics to the L-ed graphite server.
167             Metrics are queued and written to the server in a non-blocking way, in the order that L is called.
168              
169             Metrics are strings of the form C as documented as L<"the plaintext protocol"|https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol>.
170             Each string is one metric.
171             It will be line-ending normalized, no newline is required.
172             Writes are batched in groups of size L.
173              
174             If the writer is not already connected, calling write will implicitly call L.
175              
176             Returns a L that will be resolved when the metrics passed B are written.
177             The promise is rejected if any write in the write queue fails, even if it is not from the write call.
178              
179             =head1 FUTURE WORK
180              
181             This module is still in early development.
182             Future work will include
183              
184             =over
185              
186             =item *
187              
188             Passing structures to L and handling the formatting
189              
190             =item *
191              
192             Possibly a blocking api, though this is questionable
193              
194             =back
195              
196             =head1 SEE ALSO
197              
198             =over
199              
200             =item *
201              
202             L
203              
204             =back
205              
206             =head1 THANKS
207              
208             This module's development was sponsored by L.
209              
210             =head1 SOURCE REPOSITORY
211              
212             L
213              
214             =head1 AUTHOR
215              
216             Joel Berger, Ejoel.a.berger@gmail.comE
217              
218             =head1 CONTRIBUTORS
219              
220             None yet.
221              
222             =head1 THANKS
223              
224             Mohammad S Anwar (manwar)
225              
226             =head1 COPYRIGHT AND LICENSE
227              
228             Copyright (C) 2019 by L and L
229              
230             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
231              
232