File Coverage

blib/lib/Collectd/Plugins/Riemann/Query.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Collectd::Plugins::Riemann::Query;
2              
3 3     3   3159 use strict;
  3         4  
  3         104  
4 3     3   13 use warnings;
  3         4  
  3         98  
5              
6 3     3   13 use Collectd qw( :all );
  3         4  
  3         818  
7 3     3   636 use Collectd::Plugins::Common qw(recurse_config);
  0            
  0            
8             use Riemann::Client;
9             use Try::Tiny;
10             #use DDP {
11             # deparse => 1,
12             # class => {
13             # expand => 'all'
14             # }
15             #};
16              
17             my %opt = (
18             Server => "127.0.0.1",
19             Port => 5555,
20             Protocol => 'TCP',
21             Type => 'gauge',
22             PluginInstance => '',
23             TypeInstance => '',
24             PluginFrom => 'plugin',
25             PluginInstanceFrom => 'plugin_instance',
26             TypeFrom => 'ds_type',
27             TypeInstanceFrom => 'type_instance',
28             );
29              
30             =head1 NAME
31              
32             Collectd::Plugins::Riemann::Query - Collectd plugin for querying Riemann Events
33              
34             =head1 SYNOPSIS
35              
36             To be used with L.
37              
38             =over 8
39              
40             =item From the collectd configfile
41              
42            
43             Globals true
44            
45              
46            
47             BaseName "Collectd::Plugins"
48             LoadPlugin "Riemann::Query"
49            
50             Host myriemann
51             Port 5555
52             Protocol TCP
53             # Static plugin metadata
54            
55             Query "tagged \"foo\" and service =~ \"bar%\""
56             Plugin foo
57             PluginInstance bar
58             Type gauge
59            
60             # plugin metadata from riemann attributes
61            
62             Query "tagged \"aggregation\""
63             PluginFrom plugin
64             PluginInstanceFrom plugin_instance
65             TypeFrom ds_type
66             TypeInstanceFrom type_instance
67            
68            
69            
70              
71             =back
72            
73             =head1 Root block configuration options
74              
75             =over 8
76              
77             =item Host STRING
78              
79             riemann host to query. defaults to localhost
80              
81             =item Port STRING
82              
83             riemann port to query. defaults to 5555
84              
85             =item Protocol STRING
86              
87             defaults to TCP
88              
89             =back
90              
91             =head1 Plugin block configuration options
92              
93             =over 8
94              
95             =item Query STRING
96              
97             Riemann Query. Mandatory
98              
99             =item Host STRING
100              
101             Static host part of collectd plugin. If unset, the host part of the riemann event will be used instead.
102              
103             =item PluginFrom/TypeFrom/PluginInstanceFrom/TypeInstanceFrom STRING
104              
105             Dynamic plugin metadata: riemann attribute to be used to set corresponding collectd metadata. service and host are also possible. Defaults to plugin, type, plugin_instance and type_instance respectively.
106              
107             =item Plugin/Type/PluginInstance/TypeInstance STRING
108              
109             Will be used instead if no *From counterpart is used or found in riemann event. Can be used as a fallback. Default for Type is gauge and for Plugin is riemann service of the event.
110              
111             =back
112              
113             =head1 SUBROUTINES
114              
115             Please refer to the L documentation.
116             Or C
117              
118             =head1 FILES
119              
120             /etc/collectd.conf
121             /etc/collectd.d/
122              
123             =head1 SEE ALSO
124              
125             Collectd, collectd-perl, collectd
126              
127             =cut
128              
129             my $plugin_name = "Riemann::Query";
130             my $r;
131              
132             plugin_register(TYPE_CONFIG, $plugin_name, 'my_config');
133             plugin_register(TYPE_READ, $plugin_name, 'my_get');
134             plugin_register(TYPE_INIT, $plugin_name, 'my_init');
135              
136             sub my_init {
137             1;
138             }
139              
140             sub my_log {
141             plugin_log shift @_, join " ", "plugin=".$plugin_name, @_;
142             }
143              
144             sub my_config {
145             my (undef,$o) = recurse_config($_[0]);
146             _validate_config($o) or return;
147             %opt = (%opt,%$o);
148             }
149              
150             sub my_get {
151             unless (ref $r eq "Riemann::Client") {
152             my_log(LOG_DEBUG, "get: initializing riemann client");
153             $r = Riemann::Client->new(
154             host => $opt{Host},
155             port => $opt{Port},
156             proto => $opt{Protocol},
157             )
158             }
159             my_log(LOG_DEBUG, "get: fetching data");
160              
161              
162             #$VAR8 = [
163             # {
164             # 'PluginInstance' => 'bar',
165             # 'Type' => 'gauge',
166             # 'Query' => 'tagged "foo" and service =~ "bar%"'
167             # },
168             # {
169             # 'TypeFrom' => 'ds_type',
170             # 'PluginInstanceFrom' => 'plugin_instance',
171             # 'Query' => 'tagged "aggregation"',
172             # 'TypeInstanceFrom' => 'type_instance',
173             # 'PluginFrom' => 'plugin'
174             # }
175             # ];
176              
177             my @Plugins;
178             if (ref $opt{Plugin} eq "ARRAY") {
179             @Plugins = @{$opt{Plugin}}
180             } elsif ( ref $opt{Plugin} eq "HASH") {
181             @Plugins = ($opt{Plugin})
182             } else {
183             my_log(LOG_ERR, "get: internal configuration problem: 'Plugin' must be hash or array");
184             return
185             }
186             my $pi = -1;
187             PLUGIN: for my $Plugin (@Plugins) {
188             $pi++;
189             my $res;
190             my $query = $Plugin -> {Query};
191             unless (defined $query) {
192             my_log(LOG_ERR, "get: no query defined for plugin[$pi]. ignoring");
193             next PLUGIN
194             }
195             try {
196             $res = $r -> query($Plugin -> {Query});
197             } catch {
198             my_log(LOG_ERR, "get: problem fetching data for query `$query`", $_);
199             return;
200             };
201             unless ($res) {
202             my_log(LOG_ERR, "get: empty message for query `$query`");
203             next PLUGIN
204             }
205             my $events = $res -> {events};
206             unless ($events) {
207             my_log(LOG_INFO, "get: query `$query` returned no events");
208             next PLUGIN
209             }
210             unless (ref $events eq "ARRAY") {
211             my_log(LOG_ERR, "get: events not array for query `$query`");
212             return;
213             }
214             for my $event (@$events) {
215             my $host = $Plugin->{Host} || $event -> {host} || "nil";
216             _sanitize($host);
217             my %plugin;
218             for my $k (qw(Type TypeInstance Plugin PluginInstance)) {
219             if (exists($Plugin->{"${k}From"})) { # metadata from riemann
220             my $ik = "${k}From";
221             my $attr = _get_riemann_attribute($event,$Plugin->{$ik});
222             if (defined($attr)) {
223             my_log(LOG_DEBUG, "Inferring `$k` using `$ik=$attr` option value for query `$query`.");
224             $plugin{_plug2cb($k)} = $attr
225             } else {
226             my_log(LOG_DEBUG, "Not inferring `$k` using `$ik` option value for query `$query`.");
227             }
228             } elsif (exists($Plugin->{$k})) { # static metadata
229             my $ik = $k;
230             my $v = $Plugin->{$ik};
231             my_log(LOG_DEBUG, "Inferring `$k` using `$ik=$v` option value for query `$query`.");
232             $plugin{_plug2cb($k)} = $v;
233             } elsif (defined($opt{"${k}From"})) {
234             my $ik = "${k}From";
235             my $attr = _get_riemann_attribute($event,$opt{$ik});
236             if (defined($attr)) {
237             my_log(LOG_DEBUG, "Inferring `$k` using default `$ik=$attr` option value for query `$query`.");
238             $plugin{_plug2cb($k)} = $attr if defined($attr);
239             } else {
240             my_log(LOG_DEBUG, "Not inferring `$k` using default `$ik` option value for query `$query`.");
241             }
242             } elsif (defined($opt{$k})) {
243             my $ik = $k;
244             my $v = $opt{$ik};
245             my_log(LOG_DEBUG, "Inferring `$k` using default `$ik=$v` option value for query `$query`.");
246             $plugin{_plug2cb($k)} = $v;
247             } else {
248             my_log(LOG_INFO, "failed to infer `${k}` for query `$query`. Will ignore query results");
249             next PLUGIN
250             }
251             }
252             for my $k (qw/Plugin Type/) {
253             unless (defined $plugin{_plug2cb($k)}) {
254             my_log(LOG_INFO, "Key `$k` is empty for query `$query`. Will ignore query results");
255             next PLUGIN
256             }
257             }
258             my $ttl = $event -> {ttl};
259             my $interval = plugin_get_interval();
260             if ($ttl && $interval gt $ttl) {
261             my_log(LOG_INFO, "TTL ($ttl) for event returned by query `$query` is smaller than collectd interval ($interval)");
262             }
263             my $metric;
264             if (exists $event -> {metric_d}) {
265             $metric = $event -> {metric_d}
266             } elsif (exists $event -> {metric_f}) {
267             $metric = $event -> {metric_f}
268             } elsif (exists $event -> {metric_sint64}) {
269             $metric = $event -> {metric_sint64}
270             } else {
271             my $p_s = join(',',%plugin);
272             my_log(LOG_INFO, "get: event `$p_s` for query `$query` has no metric: ignoring");
273             next PLUGIN
274             }
275             _dispatch($host,\%plugin,$metric);
276             }
277             }
278             1;
279             }
280              
281             sub _validate_config {
282             my $o = shift;
283             unless (exists($o->{Plugin})) {
284             my_log(LOG_ERR, "missing 'Plugin' block in configuration");
285             return
286             }
287            
288             }
289              
290             sub _sanitize ($) {
291             map { s/ /_/g } @_;
292             }
293              
294             sub _get_riemann_attribute ($$) {
295             my ($evt, $key) = @_;
296             unless ($evt -> isa('Event')) {
297             my_log(LOG_ERR, "_get_riemann_attribute event is garbled");
298             return
299             }
300             unless ($key) {
301             my_log(LOG_ERR, "_get_riemann_attribute arg2 empty");
302             return
303             }
304             if ($key eq 'service' or $key eq 'host') {
305             return $evt -> {$key};
306             } else {
307             my $attributes = $evt -> {attributes};
308             if ($attributes && ref $attributes eq "ARRAY") {
309             for my $attr (@$attributes) {
310             if ($attr -> {key} eq $key) {
311             return $attr -> {value}
312             }
313             }
314             } else {
315             my_log(LOG_DEBUG, "_get_riemann_attribute no attributes for event");
316             }
317             my_log(LOG_DEBUG, "_get_riemann_attribute attribute `$key` not found for event");
318             }
319             return
320             }
321              
322             sub _dispatch ($$$) {
323             my $host = shift;
324             my $p = shift;
325             my %plugin = %$p;
326             my $metric = shift;
327             $plugin{host} = $host;
328             $plugin{values} = [ $metric ];
329             my $ret = plugin_dispatch_values(\%plugin);
330             unless ($ret) {
331             my $p_s = join(',',%plugin);
332             my_log(LOG_INFO, "dispatch error: `$p_s`") unless ($ret);
333             }
334             return $ret;
335             }
336              
337             sub _plug2cb {
338             my $p = shift;
339             my %plugin_cb_mapping = (
340             Plugin => 'plugin',
341             PluginInstance => 'plugin_instance',
342             Type => 'type',
343             TypeInstance => 'type_instance'
344             );
345             if (exists($plugin_cb_mapping{$p})) {
346             $plugin_cb_mapping{$p};
347             } else {
348             undef
349             }
350             }
351              
352             1;
353