File Coverage

lib/Beekeeper/Service/LogTail/Worker.pm
Criterion Covered Total %
statement 21 96 21.8
branch 0 32 0.0
condition 0 41 0.0
subroutine 7 17 41.1
pod 3 4 75.0
total 31 190 16.3


line stmt bran cond sub pod time code
1             package Beekeeper::Service::LogTail::Worker;
2              
3 1     1   1006 use strict;
  1         3  
  1         31  
4 1     1   5 use warnings;
  1         2  
  1         41  
5              
6             our $VERSION = '0.08';
7              
8 1     1   6 use Beekeeper::Worker ':log';
  1         1  
  1         162  
9 1     1   8 use base 'Beekeeper::Worker';
  1         2  
  1         72  
10              
11 1     1   6 use Beekeeper::Logger ':log_levels';
  1         2  
  1         147  
12 1     1   8 use Scalar::Util 'weaken';
  1         2  
  1         48  
13 1     1   6 use JSON::XS;
  1         2  
  1         1353  
14              
15             my @Log_buffer;
16              
17              
18             sub authorize_request {
19 0     0 1   my ($self, $req) = @_;
20              
21 0 0         return unless $self->__has_authorization_token('BKPR_ADMIN');
22              
23 0           return BKPR_REQUEST_AUTHORIZED;
24             }
25              
26             sub on_startup {
27 0     0 1   my $self = shift;
28              
29 0   0       $self->{max_entries} = $self->{config}->{buffer_entries} || 20000;
30 0   0       $self->{log_level} = $self->{config}->{log_level} || LOG_DEBUG;
31              
32 0           $self->_connect_to_all_brokers;
33              
34 0           $self->accept_remote_calls(
35             '_bkpr.logtail.tail' => 'tail',
36             );
37              
38 0           log_info "Ready";
39             }
40              
41             sub _connect_to_all_brokers {
42 0     0     my $self = shift;
43 0           weaken($self);
44              
45 0           my $own_bus = $self->{_BUS};
46 0           my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $own_bus->bus_id );
47              
48 0           $self->{_BUS_GROUP} = [];
49              
50 0           foreach my $config (@$group_config) {
51              
52 0           my $bus_id = $config->{'bus_id'};
53              
54 0 0         if ($bus_id eq $own_bus->bus_id) {
55             # Already connected to our own bus
56 0           $self->_collect_log($own_bus);
57 0           next;
58             }
59              
60 0           my $bus; $bus = Beekeeper::MQTT->new(
61             %$config,
62             bus_id => $bus_id,
63             timeout => 300,
64             on_connect => sub {
65             # Setup subscriptions
66 0     0     $self->_collect_log($bus);
67             },
68             on_error => sub {
69             # Reconnect
70 0   0 0     my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
  0            
71 0           log_error "Connection to $bus_id failed: $errmsg";
72 0           my $delay = $self->{connect_err}->{$bus_id}++;
73             $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
74             after => ($delay < 10 ? $delay * 3 : 30),
75 0           cb => sub { $bus->connect },
76 0 0         );
77             },
78 0           );
79              
80 0           push @{$self->{_BUS_GROUP}}, $bus;
  0            
81              
82 0           $bus->connect;
83             }
84             }
85              
86             sub _collect_log {
87 0     0     my ($self, $bus) = @_;
88              
89             # Default logger logs to topic log/$level/$service
90              
91 0           my $max_entries = $self->{max_entries};
92 0           my $log_level = $self->{log_level};
93 0           my $worker = $self->{_WORKER};
94              
95 0           foreach my $level (1..$log_level) {
96              
97 0           my $topic = "log/$level/#";
98 0           my $req;
99              
100             $bus->subscribe(
101             topic => $topic,
102             on_publish => sub {
103             # my ($payload_ref, $mqtt_properties) = @_;
104              
105 0     0     $req = decode_json( ${$_[0]} );
  0            
106              
107 0           push @Log_buffer, $req->{params};
108              
109 0 0         shift @Log_buffer if (@Log_buffer > $max_entries);
110              
111             # Track number of collected log entries
112 0           $worker->{notif_count}++;
113             },
114             on_suback => sub {
115 0     0     my ($success, $prop) = @_;
116 0 0         die "Could not subscribe to log topic '$topic'" unless $success;
117             },
118 0           );
119             }
120             }
121              
122             sub on_shutdown {
123 0     0 1   my ($self, %args) = @_;
124              
125 0           foreach my $bus (@{$self->{_BUS_GROUP}}) {
  0            
126              
127 0 0         next unless ($bus->{is_connected});
128 0           $bus->disconnect;
129             }
130              
131 0           log_info "Stopped";
132             }
133              
134             sub tail {
135 0     0 0   my ($self, $params) = @_;
136              
137 0           foreach ('count','level','after') {
138 0 0         next unless defined $params->{$_};
139 0 0         unless ($params->{$_} =~ m/^\d+(\.\d+)?$/) {
140 0           die "Invalid parameter $_";
141             }
142             }
143              
144 0           foreach ('host','pool','service','message') {
145 0 0         next unless defined $params->{$_};
146             # Allow simple regexes
147 0 0         unless ($params->{$_} =~ m/^[\w .*+?:,()\-\[\]\\]+$/) {
148 0           die "Invalid parameter $_";
149             }
150             }
151              
152 0   0       my $count = $params->{count} || 10;
153 0           my $after = $params->{after};
154 0           my $level = $params->{level};
155              
156             # This will die when an invalid regex is provided, but that's fine
157 0 0         my $host_re = defined $params->{host} ? qr/$params->{host}/i : undef;
158 0 0         my $pool_re = defined $params->{pool} ? qr/$params->{pool}/i : undef;
159 0 0         my $svc_re = defined $params->{service} ? qr/$params->{service}/i : undef;
160 0 0         my $msg_re = defined $params->{message} ? qr/$params->{message}/i : undef;
161              
162 0           my ($entry, @filtered);
163              
164 0           for (my $i = @Log_buffer - 1; $i >= 0; $i--) {
165              
166 0           $entry = $Log_buffer[$i];
167              
168             next if (defined $level && $entry->{level} > $level ) ||
169             (defined $after && $entry->{tstamp} <= $after ) ||
170             (defined $host_re && $entry->{host} !~ $host_re ) ||
171             (defined $pool_re && $entry->{pool} !~ $pool_re ) ||
172             (defined $svc_re && $entry->{service} !~ $svc_re ) ||
173 0 0 0       (defined $msg_re && $entry->{message} !~ $msg_re );
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
174              
175 0           unshift @filtered, $entry;
176              
177 0 0         last if (@filtered >= $count);
178             }
179              
180 0           return \@filtered;
181             }
182              
183             1;
184              
185             __END__