File Coverage

blib/lib/InfluxDB/Client/Simple.pm
Criterion Covered Total %
statement 23 141 16.3
branch 0 64 0.0
condition 0 8 0.0
subroutine 8 16 50.0
pod 5 5 100.0
total 36 234 15.3


line stmt bran cond sub pod time code
1             package InfluxDB::Client::Simple;
2              
3 1     1   57123 use 5.006;
  1         4  
4 1     1   5 use strict;
  1         2  
  1         18  
5 1     1   5 use warnings;
  1         9  
  1         34  
6              
7 1     1   6 use Carp;
  1         1  
  1         58  
8 1     1   432 use IO::Socket::INET;
  1         17998  
  1         6  
9 1     1   955 use JSON;
  1         10280  
  1         6  
10 1     1   650 use LWP::UserAgent;
  1         36223  
  1         37  
11 1     1   10 use URI;
  1         2  
  1         1584  
12              
13             =head1 NAME
14              
15             InfluxDB::Client::Simple - The lightweight InfluxDB client
16              
17             =head1 VERSION
18              
19             Version 0.06
20              
21             =cut
22              
23             our $VERSION = '0.06';
24              
25             =head1 SYNOPSIS
26              
27             InfluxDB::Client::Simple provides an easy way to interact with an InfluxDB server.
28              
29             use InfluxDB::Client::Simple;
30              
31             ########################## TCP ##########################
32             my $client = InfluxDB::Client::Simple->new( host => 'server.address.com', port => 8086, protocol => 'tcp' ) or die "Can't instantiate client";
33              
34             # Check server connectivity
35             my $result = $client->ping();
36             die "No pong" unless $result;
37              
38             # You can also get the server version
39             print $result->{version};
40              
41             # Read
42             $result = $client->query('SELECT "severity_code" FROM "syslog" WHERE ("severity" = \'err\' AND "hostname" =~ /^(srv01|srv02)$/) AND time >= 1558878013531ms and time <= 1609886964827ms', database => 'grafana');
43              
44             # Write
45             $result = $client->write("testing,host=containment,repo=cadi-libs,file=testfile statement=42,pod=85", database => 'dbname');
46              
47             ########################## UDP ##########################
48             $client = InfluxDB::Client::Simple->new( host => 'server.address.com', port => 8089, protocol => 'udp', database => 'grafana' ) or die "Can't instantiate client";
49              
50             # UDP allows only write()
51             $result = $client->write("testing,host=containment,repo=cadi-libs,file=testfile statement=47,pod=89");
52              
53             =head1 WHY
54              
55             In its current state this module offers few additional features over InfluxDB::HTTP (from which it's derived)
56              
57             The only reasons why you would use this module are:
58              
59             =over
60              
61             =item *
62             Minimal dependencies (no Object::Result and its dependencies)
63              
64             =item *
65             You want to use UDP protocol for writing (WIP)
66              
67             =back
68              
69             =head1 SUBROUTINES/METHODS
70              
71             =head2 new ( [%options] )
72              
73             Constructor.
74             %otions is a hash with the following keys:
75              
76             =over
77              
78             =item *
79             database - Database name (default: 'grafana')
80              
81             =item *
82             host - Server hostname (default: 'localhost')
83              
84             =item *
85             port - Server port (default: 8086)
86              
87             =item *
88             protocol - Transport protocol 'udp' or 'tcp' (default: 'tcp')
89             Note that when using the udp protocol, the default behaviour is to avoid dying on errors.
90             (You can change that with the 'strict_udp' option)
91              
92             =item *
93             strict_udp - Boolean value to die on UDP error (false by default)
94              
95             =item *
96             timeout - Timeout value in seconds (default: 180)
97              
98             =back
99              
100             =cut
101              
102             sub new {
103 0     0 1   my $class = shift;
104 0           my %args = ( database => 'grafana',
105             host => 'localhost',
106             port => 8086,
107             protocol => 'tcp',
108             timeout => 180,
109             @_,
110             );
111 0 0         my ( $host, $port, $protocol, $strict_udp, $timeout ) = map { defined($_)?lc($_):'' } @args{ 'host', 'port', 'protocol', 'strict_udp', 'timeout' };
  0            
112              
113             my $self = { host => $host,
114             port => $port,
115             protocol => $protocol,
116             options => { database => $args{database} }
117 0           };
118              
119 0 0         if ( $protocol eq 'tcp' ) {
120 0           my $ua = LWP::UserAgent->new();
121 0           $ua->agent("InfluxDB-Client-Simple/$VERSION");
122 0           $ua->timeout($timeout);
123 0           $self->{lwp_user_agent} = $ua;
124             } else {
125 0 0         die "Unknown protocol: $protocol" unless $protocol eq "udp";
126              
127 0           my $socket = IO::Socket::INET->new( PeerAddr => "$host:$port",
128             Proto => $protocol,
129             Blocking => 0
130             );
131              
132 0 0         if ($strict_udp) {
133 0 0         die("Can't open socket: $@") unless $socket;
134             }
135              
136 0           $self->{udp} = $socket;
137             }
138              
139 0           bless $self, $class;
140              
141 0           return $self;
142             }
143              
144             =head2 ping()
145              
146             Check the server connectivity.
147              
148             Returns a hashref which evaluates to true if the connection is ok and to false otherwise.
149             The hashref has the following keys:
150              
151             =over
152              
153             =item *
154             raw - The raw response from the server
155              
156             =item *
157             error - The error message returned by the server (empty on success)
158              
159             =item *
160             version - The InfluxDB verstion returned by the server through the 'X-Influxdb-Version' header
161              
162             =back
163              
164             =cut
165              
166             sub ping {
167 0     0 1   my ($self) = @_;
168 0           my $uri = $self->_get_influxdb_http_api_uri('ping');
169 0           my $response = $self->{lwp_user_agent}->head( $uri->canonical() );
170              
171 0 0         if ( !$response->is_success() ) {
172 0           my $error = $response->message();
173 0           return { raw => $response,
174             error => $error,
175             version => undef,
176             };
177             }
178              
179 0           my $version = $response->header('X-Influxdb-Version');
180 0           return { raw => $response,
181             error => undef,
182             version => $version,
183             };
184             }
185              
186             =head2 query( $query [, %options] )
187              
188             Query the InfluxDB database using the $query passed as first parameter.
189             Optionally %options can be passed as a hash
190             Allowed keys for options are:
191              
192             =over
193              
194             =item *
195             database - The database to be queried on the InfluxDB server
196              
197             =item *
198             chunksize - The size of the chunks used for the returned data
199              
200             =item *
201             epoch - The precision format (h, m, s, ms, u, ns) for epoch timestamps
202              
203             =back
204              
205             Returns a hashref whose keys are:
206              
207             =over
208              
209             =item *
210             raw - The raw response from the server
211              
212             =item *
213             error - The error message returned by the server (empty on success)
214              
215             =item *
216             data - The InfluxDB data returned by the server
217              
218             =back
219              
220             =cut
221              
222             sub query {
223 0     0 1   my $self = shift;
224 0           my $query = shift;
225 0           my %args = ( epoch => 'ns', @_ );
226 0           my ( $database, $chunk_size, $epoch ) = @args{ 'database', 'chunk_size', 'epoch' };
227              
228 0 0         die "Missing argument 'query'" if !$query;
229 0 0         die "Argument epoch '$epoch' is not one of (h,m,s,ms,u,ns)" if $epoch !~ /^(h|m|s|ms|u|ns)$/;
230              
231 0 0         if ( ref($query) eq 'ARRAY' ) {
232 0           $query = join( ';', @$query );
233             }
234              
235 0           my $uri = $self->_get_influxdb_http_api_uri('query');
236              
237 0 0         $uri->query_form( q => $query,
    0          
    0          
238             ( $database ? ( db => $database ) : () ),
239             ( $chunk_size ? ( chunk_size => $chunk_size ) : () ),
240             ( $epoch ? ( epoch => $epoch ) : () )
241             );
242              
243 0           my $response = $self->{lwp_user_agent}->post( $uri->canonical() );
244              
245 0           chomp( my $content = $response->content() );
246              
247 0           my $error;
248 0 0         if ( $response->is_success() ) {
249 0           local $@;
250 0           my $data = eval { decode_json($content) };
  0            
251 0           $error = $@;
252              
253 0 0         if ($data) {
254 0           $error = $data->{error};
255             }
256              
257 0 0         if ( !$error ) {
258 0           $data->{request_id} = $response->header('Request-Id');
259 0           return { raw => $response,
260             data => $data,
261             error => undef,
262             };
263             }
264             } else {
265 0           $error = $content;
266             }
267              
268 0           return { raw => $response,
269             data => undef,
270             error => $error,
271             };
272             }
273              
274             =head2 write ($measurement | \@measurements, [%options])
275              
276             $measurement is the data to be send encoded according to the LineProtocol.
277              
278             %options can have the following keys:
279              
280             =over
281              
282             =item *
283             database - The database to be queried on the InfluxDB server
284              
285             =item *
286             retention_policy - The retention policy to be used (if different from the default one)
287              
288             =item *
289             precision - The precision used in the data (if diffectent from the default 'ns')
290              
291             =back
292              
293             Returns a hashref whose keys are:
294              
295             =over
296              
297             =item *
298             raw - The raw response from the server (obviously empty when using UDP)
299              
300             =item *
301             error - The error message returned by the server (empty on success)
302              
303             =back
304              
305             =cut
306              
307             sub write {
308 0     0 1   my $self = shift;
309 0           my $measurement = shift;
310 0           my %args = (%{$self->{options}}, @_);
  0            
311 0           my ( $database, $precision, $retention_policy ) = @args{ 'database', 'precision', 'retention_policy' };
312              
313 0 0         die "Missing argument 'measurement'" if !$measurement;
314 0 0         die "Missing argument 'database'" if !$database;
315 0 0 0       die "Argument precision '$precision' is set and not one of (h,m,s,ms,u,ns)" if $precision && $precision !~ /^(h|m|s|ms|u|ns)$/;
316              
317 0 0         if ( ref($measurement) eq 'ARRAY' ) {
318 0           $measurement = join( "\n", @$measurement );
319             }
320              
321 0 0         if ($self->{protocol} eq 'tcp') {
322 0           my $uri = $self->_get_influxdb_http_api_uri('write');
323              
324 0 0         $uri->query_form( db => $database,
    0          
325             ( $precision ? ( precision => $precision ) : () ),
326             ( $retention_policy ? ( rp => $retention_policy ) : () )
327             );
328              
329 0           my $response = $self->{lwp_user_agent}->post( $uri->canonical(), Content => $measurement );
330              
331 0           chomp( my $content = $response->content() );
332              
333 0 0         if ( $response->code() != 204 ) {
334 0           local $@;
335 0           my $data = eval { decode_json($content) };
  0            
336 0           my $error = $@;
337 0 0 0       $error = $data->{error} if ( !$error && $data );
338              
339 0           return { raw => $response,
340             error => $error,
341             };
342             }
343              
344 0           return { raw => $response,
345             error => undef,
346             };
347              
348             } else {
349              
350             # Udp send
351 0 0         my $bytes = $self->{udp}?$self->{udp}->send($measurement):0;
352              
353             # should be more picky here : compare $bytes with length of $measurement ?
354 0 0         return { raw => undef,
355             error => $bytes?undef:"Undefinded error while sending data (udp)",
356             };
357             }
358             }
359              
360              
361             =head2 send_data ($measurement, \%tags, \%fields, [%options])
362              
363             Write data to the influxDB after converting them into LineProtocol format.
364             (call write() underneath)
365              
366             $measurement is the name to be used for measurement
367              
368             \%tags is the tag set associated to this datapoint
369              
370             \%fields are the field set associated to this datapoint
371              
372             $timestamp is an optional timestamp value
373              
374             \%options
375              
376             %options can have the following keys:
377              
378             =over
379              
380             =item *
381             database - The database to be queried on the InfluxDB server
382              
383             =item *
384             retention_policy - The retention policy to be used (if different from the default one)
385              
386             =item *
387             precision - The precision used in the data (if diffectent from the default 'ns')
388              
389             =back
390              
391             Returns a hashref whose keys are:
392              
393             =over
394              
395             =item *
396             raw - The raw response from the server (obviously empty when using UDP)
397              
398             =item *
399             error - The error message returned by the server (empty on success)
400              
401             =back
402              
403             =cut
404              
405             sub send_data {
406 0     0 1   my $self = shift;
407 0           my $measurement = shift;
408 0           my $tags = shift;
409 0           my $fields = shift;
410 0           my %options = @_;
411              
412 0           return $self->write(_line_protocol($measurement, $tags, $fields), %options);
413              
414             }
415              
416             sub _get_influxdb_http_api_uri {
417 0     0     my ( $self, $endpoint ) = @_;
418              
419 0 0         die "Missing argument 'endpoint'" if !$endpoint;
420              
421 0           my $uri = URI->new();
422              
423 0           $uri->scheme('http');
424 0           $uri->host( $self->{host} );
425 0           $uri->port( $self->{port} );
426 0           $uri->path($endpoint);
427              
428 0           return $uri;
429             }
430              
431             # Blatantly stolen from InfluxDB::LineProtocol
432             sub _format_value {
433 0     0     my $k = shift;
434 0           my $v = shift;
435            
436 0 0         if ( $v =~ /^(-?\d+)(?:i?)$/ ) {
    0          
    0          
    0          
437 0           $v = $1 . 'i';
438             }
439             elsif ( $v =~ /^[Ff](?:ALSE|alse)?$/ ) {
440 0           $v = 'FALSE';
441             }
442             elsif ( $v =~ /^[Tt](?:RUE|rue)?$/ ) {
443 0           $v = 'TRUE';
444             }
445             elsif ( $v =~ /^-?\d+(?:\.\d+)?(?:e(?:-|\+)?\d+)?$/ ) {
446             # pass it on, no mod
447             }
448             else {
449             # string actually, but this should be quoted differently?
450 0           $v =~ s/(["\\])/\\$1/g;
451 0           $v = '"' . $v . '"';
452             }
453            
454 0           return $v;
455             }
456              
457              
458             sub _line_protocol {
459 0     0     my $measurement = shift;
460 0           my $tags = shift;
461 0           my $fields = shift;
462              
463             # sort and encode (LineProtocol) tags
464 0           my @tags;
465 0           foreach my $k ( sort keys %$tags ) {
466 0           my $v = $tags->{$k};
467 0 0         next unless defined($v);
468 0           $k =~ s/([,\s])/\\$1/g;
469 0           $v =~ s/([,\s])/\\$1/g;
470              
471 0           push( @tags, $k . '=' . $v );
472             }
473 0           my $tag_string = join( ',', @tags );
474              
475              
476             # sort and encode (LineProtocol) fields
477 0           my @fields;
478 0           foreach my $k ( sort keys %$fields ) {
479 0   0       my $v = $fields->{$k} || '';
480 0           my $esc_k = $k;
481 0           $esc_k =~ s/([,\s])/\\$1/g;
482 0           my $esc_v = _format_value($k, $v);
483              
484 0           push( @fields, $esc_k . '=' . $esc_v );
485             }
486 0           my $field_string = join( ',', @fields );
487              
488 0           return sprintf( "%s,%s %s", $measurement, $tag_string, $field_string );
489             }
490              
491             1;
492              
493             =head1 AUTHOR
494              
495             Arnaud (Arhuman) ASSAD, C<< >>
496              
497             =head1 BUGS
498              
499             Please report any bugs or feature requests to C, or through
500             the web interface at L. I will be notified, and then you'll
501             automatically be notified of progress on your bug as I make changes.
502              
503             =head1 SEE ALSO
504              
505             This module is derived from InfluxDB::HTTP.
506             This module borowed code from InfluxDB::LineProtocol
507              
508             =head1 SUPPORT
509              
510             You can find documentation for this module with the perldoc command.
511              
512             perldoc InfluxDB::Client::Simple
513              
514              
515             You can also look for information at:
516              
517             =over 4
518              
519             =item * RT: CPAN's request tracker (report bugs here)
520              
521             L
522              
523             =item * AnnoCPAN: Annotated CPAN documentation
524              
525             L
526              
527             =item * CPAN Ratings
528              
529             L
530              
531             =item * Search CPAN
532              
533             L
534              
535             =back
536              
537              
538             =head1 ACKNOWLEDGEMENTS
539              
540              
541             =head1 LICENSE AND COPYRIGHT
542              
543             This software is copyright (c) 2020 by Arnaud (Arhuman) ASSAD.
544              
545             This is free software; you can redistribute it and/or modify it under
546             the same terms as the Perl 5 programming language system itself.
547              
548              
549             =cut
550              
551             1; # End of InfluxDB::Client::Simple