File Coverage

blib/lib/Collectd/Plugins/Riemann/Query.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Collectd::Plugins::Riemann::Query;
2              
3 2     2   1806 use strict;
  2         2  
  2         64  
4 2     2   7 use warnings;
  2         2  
  2         62  
5              
6 2     2   6 use Collectd qw( :all );
  2         2  
  2         549  
7 2     2   386 use Collectd::Plugins::Common qw(recurse_config);
  0            
  0            
8             use Riemann::Client;
9             use Try::Tiny;
10             #use DDP {
11             # deparse => 1,
12             # class => {
13             # expand => 'all'
14             # }
15             #};
16              
17             my %opt = (
18             Server => "127.0.0.1",
19             Port => 5555,
20             Protocol => 'TCP',
21             Type => 'gauge',
22             PluginInstance => '',
23             TypeInstance => '',
24             PluginFrom => 'plugin',
25             PluginInstanceFrom => 'plugin_instance',
26             TypeFrom => 'ds_type',
27             TypeInstanceFrom => 'type_instance',
28             );
29              
30             =head1 NAME
31              
32             Collectd::Plugins::Riemann::Query - Collectd plugin for querying Riemann Events
33              
34             =head1 SYNOPSIS
35              
36             To be used with L.
37              
38             =over 8
39              
40             =item From the collectd configfile
41              
42            
43             Globals true
44            
45              
46            
47             BaseName "Collectd::Plugins"
48             LoadPlugin "Riemann::Query"
49            
50             Host myriemann
51             Port 5555
52             Protocol TCP
53             # Static plugin metadata
54            
55             Query "tagged \"foo\" and service =~ \"bar%\""
56             Plugin foo
57             PluginInstance bar
58             Type gauge
59            
60             # plugin metadata from riemann attributes
61            
62             Query "tagged \"aggregation\""
63             PluginFrom plugin
64             PluginInstanceFrom plugin_instance
65             TypeFrom ds_type
66             TypeInstanceFrom type_instance
67            
68            
69            
70              
71             =back
72            
73             =head1 Root block configuration options
74              
75             =over 8
76              
77             =item Host STRING
78              
79             riemann host to query. defaults to localhost
80              
81             =item Port STRING
82              
83             riemann port to query. defaults to 5555
84              
85             =item Protocol STRING
86              
87             defaults to TCP
88              
89             =back
90              
91             =head1 Plugin block configuration options
92              
93             =over 8
94              
95             =item Query STRING
96              
97             Riemann Query. Mandatory
98              
99             =item Host STRING
100              
101             Static host part of collectd plugin. If unset, the host part of the riemann event will be used instead.
102              
103             =item PluginFrom/TypeFrom/PluginInstanceFrom/TypeInstanceFrom STRING
104              
105             Dynamic plugin metadata: riemann attribute to be used to set corresponding collectd metadata. service and host are also possible. Defaults to plugin, type, plugin_instance and type_instance respectively.
106              
107             =item Plugin/Type/PluginInstance/TypeInstance STRING
108              
109             Will be used instead if no *From counterpart is used or found in riemann event. Can be used as a fallback. Default for Type is gauge and for Plugin is riemann service of the event.
110              
111             =back
112              
113             =head1 SUBROUTINES
114              
115             Please refer to the L documentation.
116             Or C
117              
118             =head1 FILES
119              
120             /etc/collectd.conf
121             /etc/collectd.d/
122              
123             =head1 SEE ALSO
124              
125             Collectd, collectd-perl, collectd
126              
127             =cut
128              
129             my $plugin_name = "Riemann::Query";
130             my $r;
131              
132             plugin_register(TYPE_CONFIG, $plugin_name, 'my_config');
133             plugin_register(TYPE_READ, $plugin_name, 'my_get');
134             plugin_register(TYPE_INIT, $plugin_name, 'my_init');
135              
136             sub my_init {
137             1;
138             }
139              
140             sub my_log {
141             plugin_log shift @_, join " ", "plugin=".$plugin_name, @_;
142             }
143              
144             sub my_config {
145             my (undef,$o) = recurse_config($_[0]);
146             _validate_config($o) or return;
147             %opt = (%opt,%$o);
148             }
149              
150             sub my_get {
151             unless (ref $r eq "Riemann::Client") {
152             my_log(LOG_DEBUG, "get: initializing riemann client");
153             $r = Riemann::Client->new(
154             host => $opt{Host},
155             port => $opt{Port},
156             proto => $opt{Protocol},
157             )
158             }
159             my_log(LOG_DEBUG, "get: fetching data");
160              
161              
162             #$VAR8 = [
163             # {
164             # 'PluginInstance' => 'bar',
165             # 'Type' => 'gauge',
166             # 'Query' => 'tagged "foo" and service =~ "bar%"'
167             # },
168             # {
169             # 'TypeFrom' => 'ds_type',
170             # 'PluginInstanceFrom' => 'plugin_instance',
171             # 'Query' => 'tagged "aggregation"',
172             # 'TypeInstanceFrom' => 'type_instance',
173             # 'PluginFrom' => 'plugin'
174             # }
175             # ];
176              
177             my @Plugins;
178             if (ref $opt{Plugin} eq "ARRAY") {
179             @Plugins = @{$opt{Plugin}}
180             } elsif ( ref $opt{Plugin} eq "HASH") {
181             @Plugins = ($opt{Plugin})
182             } else {
183             my_log(LOG_ERR, "get: internal configuration problem: 'Plugin' must be hash or array");
184             return
185             }
186             my $pi = -1;
187             PLUGIN: for my $Plugin (@Plugins) {
188             $pi++;
189             my $res;
190             my $query = $Plugin -> {Query};
191             unless (defined $query) {
192             my_log(LOG_ERR, "get: no query defined for plugin[$pi]. ignoring");
193             next PLUGIN
194             }
195             try {
196             $res = $r -> query($Plugin -> {Query});
197             } catch {
198             my_log(LOG_ERR, "get: problem fetching data for query `$query`", $_);
199             return;
200             };
201             unless ($res) {
202             my_log(LOG_ERR, "get: empty message for query `$query`");
203             next PLUGIN
204             }
205             my $events = $res -> {events};
206             unless ($events) {
207             my_log(LOG_INFO, "get: query `$query` returned no events");
208             next PLUGIN
209             }
210             unless (ref $events eq "ARRAY") {
211             my_log(LOG_ERR, "get: events not array for query `$query`");
212             return;
213             }
214             for my $event (@$events) {
215             my $host = $Plugin->{Host} || $event -> {host} || "nil";
216             _sanitize($host);
217             my %plugin;
218             for my $k (qw(Type TypeInstance Plugin PluginInstance)) {
219             if (exists($Plugin->{"${k}From"})) { # metadata from riemann
220             my $attr = _get_riemann_attribute($event,$Plugin->{"${k}From"});
221             $plugin{_plug2cb($k)} = $attr if defined($attr);
222             } elsif (exists($Plugin->{$k})) { # static metadata
223             $plugin{_plug2cb($k)} = $Plugin->{$k};
224             } elsif (defined($opt{"${k}From"})) {
225             my $attr = _get_riemann_attribute($event,$opt{"${k}From"});
226             $plugin{_plug2cb($k)} = $attr if defined($attr);
227             } elsif (defined($opt{$k})) {
228             $plugin{_plug2cb($k)} = $opt{$k};
229             } else {
230             my_log(LOG_INFO, "failed to infer `${k}` for query `$query`. Will ignore query results");
231             next PLUGIN
232             }
233             }
234             for my $k (qw/Plugin Type/) {
235             unless (defined $plugin{_plug2cb($k)}) {
236             my_log(LOG_INFO, "Key `${k}` is empty for query `$query`. Will ignore query results");
237             next PLUGIN
238             }
239             }
240             my $ttl = $event -> {ttl};
241             my $interval = plugin_get_interval();
242             if ($ttl && $interval lt $ttl) {
243             my_log(LOG_INFO, "TTL ($ttl) for event returned by query `$query` is smaller than collectd interval ($interval)");
244             }
245             my $metric;
246             if ($event -> {metric_d}) {
247             $metric = $event -> {metric_d}
248             } elsif ($event -> {metric_f}) {
249             $metric = $event -> {metric_f}
250             } elsif ($event -> {metric_sint64}) {
251             $metric = $event -> {metric_sint64}
252             } else {
253             my $p_s = join(',',%plugin);
254             my_log(LOG_INFO, "get: event `$p_s` for query `$query` has no metric: ignoring");
255             next PLUGIN
256             }
257             _dispatch($host,\%plugin,$metric);
258             }
259             }
260             1;
261             }
262              
263             sub _validate_config {
264             my $o = shift;
265             unless (exists($o->{Plugin})) {
266             my_log(LOG_ERR, "missing 'Plugin' block in configuration");
267             return
268             }
269            
270             }
271              
272             sub _sanitize ($) {
273             map { s/ /_/g } @_;
274             }
275              
276             sub _get_riemann_attribute ($$) {
277             my ($evt, $key) = @_;
278             unless ($evt -> isa('Event')) {
279             my_log(LOG_ERR, "_get_riemann_attribute event is garbled");
280             return
281             }
282             unless ($key) {
283             my_log(LOG_ERR, "_get_riemann_attribute arg2 empty");
284             return
285             }
286             if ($key eq 'service' or $key eq 'host') {
287             return $evt -> {$key};
288             } else {
289             my $attributes = $evt -> {attributes};
290             if ($attributes && ref $attributes eq "ARRAY") {
291             for my $attr (@$attributes) {
292             if ($attr -> {key} eq $key) {
293             return $attr -> {value}
294             }
295             }
296             } else {
297             my_log(LOG_DEBUG, "_get_riemann_attribute no attributes for event");
298             }
299             my_log(LOG_DEBUG, "_get_riemann_attribute attribute `$key` not found for event");
300             }
301             return
302             }
303              
304             sub _dispatch ($$$) {
305             my $host = shift;
306             my $p = shift;
307             my %plugin = %$p;
308             my $metric = shift;
309             $plugin{host} = $host;
310             $plugin{values} = [ $metric ];
311             my $ret = plugin_dispatch_values(\%plugin);
312             unless ($ret) {
313             my $p_s = join(',',%plugin);
314             my_log(LOG_INFO, "dispatch error: `$p_s`") unless ($ret);
315             }
316             return $ret;
317             }
318              
319             sub _plug2cb {
320             my $p = shift;
321             my %plugin_cb_mapping = (
322             Plugin => 'plugin',
323             PluginInstance => 'plugin_instance',
324             Type => 'type',
325             TypeInstance => 'type_instance'
326             );
327             if (exists($plugin_cb_mapping{$p})) {
328             $plugin_cb_mapping{$p};
329             } else {
330             undef
331             }
332             }
333              
334             1;
335