File Coverage

blib/lib/InfluxDB/Client/Simple.pm
Criterion Covered Total %
statement 23 145 15.8
branch 0 66 0.0
condition 0 8 0.0
subroutine 8 16 50.0
pod 5 5 100.0
total 36 240 15.0


line stmt bran cond sub pod time code
1             package InfluxDB::Client::Simple;
2              
3 1     1   68342 use 5.006;
  1         4  
4 1     1   5 use strict;
  1         2  
  1         21  
5 1     1   5 use warnings;
  1         2  
  1         46  
6              
7 1     1   6 use Carp;
  1         2  
  1         57  
8 1     1   609 use IO::Socket::INET;
  1         22160  
  1         7  
9 1     1   1260 use JSON;
  1         12916  
  1         5  
10 1     1   833 use LWP::UserAgent;
  1         45594  
  1         43  
11 1     1   9 use URI;
  1         2  
  1         1810  
12              
13             =head1 NAME
14              
15             InfluxDB::Client::Simple - The lightweight InfluxDB client
16              
17             =head1 VERSION
18              
19             Version 1.00
20              
21             =cut
22              
23             our $VERSION = '1.00';
24              
25             =head1 SYNOPSIS
26              
27             InfluxDB::Client::Simple provides an easy way to interact with an InfluxDB server.
28              
29             use InfluxDB::Client::Simple;
30              
31             ########################## TCP ##########################
32             my $client = InfluxDB::Client::Simple->new( host => 'server.address.com', port => 8086, protocol => 'tcp' ) or die "Can't instantiate client";
33              
34             # Check server connectivity
35             my $result = $client->ping();
36             die "No pong" unless $result;
37              
38             # You can also get the server version
39             print $result->{version};
40              
41             # Read
42             $result = $client->query('SELECT "severity_code" FROM "syslog" WHERE ("severity" = \'err\' AND "hostname" =~ /^(srv01|srv02)$/) AND time >= 1558878013531ms and time <= 1609886964827ms', database => 'grafana');
43              
44             # Write
45             $result = $client->write("testing,host=containment,repo=cadi-libs,file=testfile statement=42,pod=85", database => 'dbname');
46              
47             ########################## UDP ##########################
48             $client = InfluxDB::Client::Simple->new( host => 'server.address.com', port => 8089, protocol => 'udp', database => 'grafana' ) or die "Can't instantiate client";
49              
50             # UDP allows only write()
51             $result = $client->write("testing,host=containment,repo=cadi-libs,file=testfile statement=47,pod=89");
52             # or with a timestamp
53             $result = $client->write("testing,host=containment,repo=cadi-libs,file=testfile statement=47,pod=89 1610462906000");
54              
55              
56             =head1 WHY
57              
58             In its current state this module offers few additional features over InfluxDB::HTTP (from which it's derived)
59              
60             The only reasons why you would use this module are:
61              
62             =over
63              
64             =item *
65              
66             Minimal dependencies (no Object::Result and its dependencies)
67              
68             =item *
69             You want to use UDP protocol for writing
70              
71             =back
72              
73             =head1 SUBROUTINES/METHODS
74              
75             =head2 new ( [%options] )
76              
77             Constructor.
78             %otions is a hash with the following keys:
79              
80             =over
81              
82             =item *
83             database - Database name (default: 'grafana')
84              
85             =item *
86             host - Server hostname (default: 'localhost')
87              
88             =item *
89             port - Server port (default: 8086)
90              
91             =item *
92             protocol - Transport protocol 'udp' or 'tcp' (default: 'tcp')
93             Note that when using the udp protocol, the default behaviour is to avoid dying on errors.
94             (You can change that with the 'strict_udp' option)
95              
96             =item *
97             strict_udp - Boolean value to die on UDP error (false by default)
98              
99             =item *
100             timeout - Timeout value in seconds (default: 180)
101              
102             =back
103              
104             =cut
105              
106             sub new {
107 0     0 1   my $class = shift;
108 0           my %args = ( database => 'grafana',
109             host => 'localhost',
110             port => 8086,
111             protocol => 'tcp',
112             timeout => 180,
113             @_,
114             );
115 0 0         my ( $host, $port, $protocol, $strict_udp, $timeout ) = map { defined($_)?lc($_):'' } @args{ 'host', 'port', 'protocol', 'strict_udp', 'timeout' };
  0            
116              
117             my $self = { host => $host,
118             port => $port,
119             protocol => $protocol,
120             options => { database => $args{database} }
121 0           };
122              
123 0 0         if ( $protocol eq 'tcp' ) {
124 0           my $ua = LWP::UserAgent->new();
125 0           $ua->agent("InfluxDB-Client-Simple/$VERSION");
126 0           $ua->timeout($timeout);
127 0           $self->{lwp_user_agent} = $ua;
128             } else {
129 0 0         die "Unknown protocol: $protocol" unless $protocol eq "udp";
130              
131 0           my $socket = IO::Socket::INET->new( PeerAddr => "$host:$port",
132             Proto => $protocol,
133             Blocking => 0
134             );
135              
136 0 0         if ($strict_udp) {
137 0 0         die("Can't open socket: $@") unless $socket;
138             }
139              
140 0           $self->{udp} = $socket;
141             }
142              
143 0           bless $self, $class;
144              
145 0           return $self;
146             }
147              
148             =head2 ping()
149              
150             Check the server connectivity.
151              
152             Returns a hashref which evaluates to true if the connection is ok and to false otherwise.
153             The hashref has the following keys:
154              
155             =over
156              
157             =item *
158             raw - The raw response from the server
159              
160             =item *
161             error - The error message returned by the server (empty on success)
162              
163             =item *
164             version - The InfluxDB verstion returned by the server through the 'X-Influxdb-Version' header
165              
166             =back
167              
168             =cut
169              
170             sub ping {
171 0     0 1   my ($self) = @_;
172 0           my $uri = $self->_get_influxdb_http_api_uri('ping');
173 0           my $response = $self->{lwp_user_agent}->head( $uri->canonical() );
174              
175 0 0         if ( !$response->is_success() ) {
176 0           my $error = $response->message();
177 0           return { raw => $response,
178             error => $error,
179             version => undef,
180             };
181             }
182              
183 0           my $version = $response->header('X-Influxdb-Version');
184 0           return { raw => $response,
185             error => undef,
186             version => $version,
187             };
188             }
189              
190             =head2 query( $query [, %options] )
191              
192             Query the InfluxDB database using the $query passed as first parameter.
193             Optionally %options can be passed as a hash
194             Allowed keys for options are:
195              
196             =over
197              
198             =item *
199             database - The database to be queried on the InfluxDB server
200              
201             =item *
202             chunksize - The size of the chunks used for the returned data
203              
204             =item *
205             epoch - The precision format (h, m, s, ms, u, ns) for epoch timestamps (default is 'ns' for nanosecond)
206              
207             =back
208              
209             Returns a hashref whose keys are:
210              
211             =over
212              
213             =item *
214             raw - The raw response from the server
215              
216             =item *
217             error - The error message returned by the server (empty on success)
218              
219             =item *
220             data - The InfluxDB data returned by the server
221              
222             =back
223              
224             =cut
225              
226             sub query {
227 0     0 1   my $self = shift;
228 0           my $query = shift;
229 0           my %args = ( epoch => 'ns', @_ );
230 0           my ( $database, $chunk_size, $epoch ) = @args{ 'database', 'chunk_size', 'epoch' };
231              
232 0 0         die "Missing argument 'query'" if !$query;
233 0 0         die "Argument epoch '$epoch' is not one of (h,m,s,ms,u,ns)" if $epoch !~ /^(h|m|s|ms|u|ns)$/;
234              
235 0 0         if ( ref($query) eq 'ARRAY' ) {
236 0           $query = join( ';', @$query );
237             }
238              
239 0           my $uri = $self->_get_influxdb_http_api_uri('query');
240              
241 0 0         $uri->query_form( q => $query,
    0          
    0          
242             ( $database ? ( db => $database ) : () ),
243             ( $chunk_size ? ( chunk_size => $chunk_size ) : () ),
244             ( $epoch ? ( epoch => $epoch ) : () )
245             );
246              
247 0           my $response = $self->{lwp_user_agent}->post( $uri->canonical() );
248              
249 0           chomp( my $content = $response->content() );
250              
251 0           my $error;
252 0 0         if ( $response->is_success() ) {
253 0           local $@;
254 0           my $data = eval { decode_json($content) };
  0            
255 0           $error = $@;
256              
257 0 0         if ($data) {
258 0           $error = $data->{error};
259             }
260              
261 0 0         if ( !$error ) {
262 0           $data->{request_id} = $response->header('Request-Id');
263 0           return { raw => $response,
264             data => $data,
265             error => undef,
266             };
267             }
268             } else {
269 0           $error = $content;
270             }
271              
272 0           return { raw => $response,
273             data => undef,
274             error => $error,
275             };
276             }
277              
278             =head2 write ($measurement | \@measurements, [%options])
279              
280             $measurement is the data to be send encoded according to the LineProtocol.
281              
282             %options can have the following keys:
283              
284             =over
285              
286             =item *
287             database - The database to be queried on the InfluxDB server
288              
289             =item *
290             retention_policy - The retention policy to be used (if different from the default one)
291              
292             =item *
293             precision - The precision used in the data (if diffectent from the default 'ns')
294              
295             =back
296              
297             Returns a hashref whose keys are:
298              
299             =over
300              
301             =item *
302             raw - The raw response from the server (obviously empty when using UDP)
303              
304             =item *
305             error - The error message returned by the server (empty on success)
306              
307             =back
308              
309             =cut
310              
311             sub write {
312 0     0 1   my $self = shift;
313 0           my $measurement = shift;
314 0           my %args = (%{$self->{options}}, @_);
  0            
315 0           my ( $database, $precision, $retention_policy ) = @args{ 'database', 'precision', 'retention_policy' };
316              
317 0 0         die "Missing argument 'measurement'" if !$measurement;
318 0 0         die "Missing argument 'database'" if !$database;
319 0 0 0       die "Argument precision '$precision' is set and not one of (h,m,s,ms,u,ns)" if $precision && $precision !~ /^(h|m|s|ms|u|ns)$/;
320              
321 0 0         if ( ref($measurement) eq 'ARRAY' ) {
322 0           $measurement = join( "\n", @$measurement );
323             }
324              
325 0 0         if ($self->{protocol} eq 'tcp') {
326 0           my $uri = $self->_get_influxdb_http_api_uri('write');
327              
328 0 0         $uri->query_form( db => $database,
    0          
329             ( $precision ? ( precision => $precision ) : () ),
330             ( $retention_policy ? ( rp => $retention_policy ) : () )
331             );
332              
333 0           my $response = $self->{lwp_user_agent}->post( $uri->canonical(), Content => $measurement );
334              
335 0           chomp( my $content = $response->content() );
336              
337 0 0         if ( $response->code() != 204 ) {
338 0           local $@;
339 0           my $data = eval { decode_json($content) };
  0            
340 0           my $error = $@;
341 0 0 0       $error = $data->{error} if ( !$error && $data );
342              
343 0           return { raw => $response,
344             error => $error,
345             };
346             }
347              
348 0           return { raw => $response,
349             error => undef,
350             };
351              
352             } else {
353              
354             # Udp send
355 0 0         my $bytes = $self->{udp}?$self->{udp}->send($measurement):0;
356              
357             # should be more picky here : compare $bytes with length of $measurement ?
358 0 0         return { raw => undef,
359             error => $bytes?undef:"Undefinded error while sending data (udp)",
360             };
361             }
362             }
363              
364              
365             =head2 send_data ($measurement, \%tags, \%fields, [$timestamp, [%options]])
366              
367             Write data to the influxDB after converting them into LineProtocol format.
368             (call write() underneath)
369              
370             $measurement is the name to be used for measurement
371              
372             \%tags is the tag set associated to this datapoint
373              
374             \%fields are the field set associated to this datapoint
375              
376             $timestamp is an optional timestamp value expected to be in the database precision format (default is nanosecond)
377              
378             \%options are also optional
379              
380             %options can have the following keys:
381              
382             =over
383              
384             =item *
385             database - The database to be queried on the InfluxDB server
386              
387             =item *
388             retention_policy - The retention policy to be used (if different from the default one)
389              
390             =item *
391             precision - The precision used in the data (if diffectent from the default 'ns')
392              
393             =back
394              
395             Returns a hashref whose keys are:
396              
397             =over
398              
399             =item *
400             raw - The raw response from the server (obviously empty when using UDP)
401              
402             =item *
403             error - The error message returned by the server (empty on success)
404              
405             =back
406              
407             =cut
408              
409             sub send_data {
410 0     0 1   my $self = shift;
411 0           my $measurement = shift;
412 0           my $tags = shift;
413 0           my $fields = shift;
414 0           my $timestamp = shift;
415 0           my %options = @_;
416              
417 0           return $self->write( _line_protocol( $measurement, $tags, $fields, $timestamp ), %options );
418              
419             }
420              
421             sub _get_influxdb_http_api_uri {
422 0     0     my ( $self, $endpoint ) = @_;
423              
424 0 0         die "Missing argument 'endpoint'" if !$endpoint;
425              
426 0           my $uri = URI->new();
427              
428 0           $uri->scheme('http');
429 0           $uri->host( $self->{host} );
430 0           $uri->port( $self->{port} );
431 0           $uri->path($endpoint);
432              
433 0           return $uri;
434             }
435              
436             # Blatantly stolen from InfluxDB::LineProtocol
437             sub _format_value {
438 0     0     my $k = shift;
439 0           my $v = shift;
440            
441 0 0         if ( $v =~ /^(-?\d+)(?:i?)$/ ) {
    0          
    0          
    0          
442 0           $v = $1 . 'i';
443             }
444             elsif ( $v =~ /^[Ff](?:ALSE|alse)?$/ ) {
445 0           $v = 'FALSE';
446             }
447             elsif ( $v =~ /^[Tt](?:RUE|rue)?$/ ) {
448 0           $v = 'TRUE';
449             }
450             elsif ( $v =~ /^-?\d+(?:\.\d+)?(?:e(?:-|\+)?\d+)?$/ ) {
451             # pass it on, no mod
452             }
453             else {
454             # string actually, but this should be quoted differently?
455 0           $v =~ s/(["\\])/\\$1/g;
456 0           $v = '"' . $v . '"';
457             }
458            
459 0           return $v;
460             }
461              
462              
463             sub _line_protocol {
464 0     0     my $measurement = shift;
465 0           my $tags = shift;
466 0           my $fields = shift;
467 0           my $timestamp = shift;
468              
469             # sort and encode (LineProtocol) tags
470 0           my @tags;
471 0           foreach my $k ( sort keys %$tags ) {
472 0           my $v = $tags->{$k};
473 0 0         next unless defined($v);
474 0           $k =~ s/([,\s])/\\$1/g;
475 0           $v =~ s/([,\s])/\\$1/g;
476              
477 0           push( @tags, $k . '=' . $v );
478             }
479 0           my $tag_string = join( ',', @tags );
480              
481             # sort and encode (LineProtocol) fields
482 0           my @fields;
483 0           foreach my $k ( sort keys %$fields ) {
484 0   0       my $v = $fields->{$k} || '';
485 0           my $esc_k = $k;
486 0           $esc_k =~ s/([,\s])/\\$1/g;
487 0           my $esc_v = _format_value( $k, $v );
488              
489 0           push( @fields, $esc_k . '=' . $esc_v );
490             }
491 0           my $field_string = join( ',', @fields );
492              
493 0 0         if ($timestamp) {
494 0           return sprintf( "%s,%s %s %s", $measurement, $tag_string, $field_string, $timestamp );
495             } else {
496 0           return sprintf( "%s,%s %s", $measurement, $tag_string, $field_string );
497             }
498             }
499              
500             1;
501              
502             =head1 AUTHOR
503              
504             Arnaud (Arhuman) ASSAD, C<< >>
505              
506             =head1 BUGS
507              
508             Please report any bugs or feature requests to C, or through
509             the web interface at L. I will be notified, and then you'll
510             automatically be notified of progress on your bug as I make changes.
511              
512             =head1 SEE ALSO
513              
514             This module is derived from InfluxDB::HTTP.
515             This module borowed code from InfluxDB::LineProtocol
516              
517             =head1 SUPPORT
518              
519             You can find documentation for this module with the perldoc command.
520              
521             perldoc InfluxDB::Client::Simple
522              
523              
524             You can also look for information at:
525              
526             =over 4
527              
528             =item * RT: CPAN's request tracker (report bugs here)
529              
530             L
531              
532             =item * AnnoCPAN: Annotated CPAN documentation
533              
534             L
535              
536             =item * CPAN Ratings
537              
538             L
539              
540             =item * Search CPAN
541              
542             L
543              
544             =back
545              
546              
547             =head1 ACKNOWLEDGEMENTS
548              
549              
550             =head1 LICENSE AND COPYRIGHT
551              
552             This software is copyright (c) 2020 by Arnaud (Arhuman) ASSAD.
553              
554             This is free software; you can redistribute it and/or modify it under
555             the same terms as the Perl 5 programming language system itself.
556              
557              
558             =cut
559              
560             1; # End of InfluxDB::Client::Simple