File Coverage

lib/Beekeeper/Service/LogTail/Worker.pm
Criterion Covered Total %
statement 21 106 19.8
branch 0 34 0.0
condition 0 41 0.0
subroutine 7 19 36.8
pod 3 4 75.0
total 31 204 15.2


line stmt bran cond sub pod time code
1             package Beekeeper::Service::LogTail::Worker;
2              
3 1     1   977 use strict;
  1         5  
  1         29  
4 1     1   5 use warnings;
  1         2  
  1         41  
5              
6             our $VERSION = '0.10';
7              
8 1     1   852 use Beekeeper::Worker ':log';
  1         4  
  1         158  
9 1     1   11 use base 'Beekeeper::Worker';
  1         2  
  1         67  
10              
11 1     1   11 use Beekeeper::Logger ':log_levels';
  1         2  
  1         126  
12 1     1   6 use Scalar::Util 'weaken';
  1         2  
  1         40  
13 1     1   5 use JSON::XS;
  1         2  
  1         1394  
14              
15             my @Log_buffer;
16              
17              
18             sub authorize_request {
19 0     0 1   my ($self, $req) = @_;
20              
21 0 0         return unless $self->__has_authorization_token('BKPR_ADMIN');
22              
23 0           return BKPR_REQUEST_AUTHORIZED;
24             }
25              
26             sub on_startup {
27 0     0 1   my $self = shift;
28 0           weaken $self;
29              
30 0   0       $self->{max_entries} = $self->{config}->{buffer_entries} || 20000;
31 0   0       $self->{log_level} = $self->{config}->{log_level} || LOG_DEBUG;
32              
33 0           $self->_connect_to_all_brokers;
34              
35 0           $self->accept_remote_calls(
36             '_bkpr.logtail.tail' => 'tail',
37             );
38              
39             # Ping backend brokers to avoid disconnections due to inactivity
40             $self->{ping_timer} = AnyEvent->timer(
41             after => 60 * rand(),
42             interval => 60,
43 0     0     cb => sub { $self->_ping_backend_brokers },
44 0           );
45              
46 0           log_info "Ready";
47             }
48              
49             sub _connect_to_all_brokers {
50 0     0     my $self = shift;
51 0           weaken $self;
52              
53 0           my $own_bus = $self->{_BUS};
54 0           my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $own_bus->bus_id );
55              
56 0           $self->{_BUS_GROUP} = [];
57              
58 0           foreach my $config (@$group_config) {
59              
60 0           my $bus_id = $config->{'bus_id'};
61              
62 0 0         if ($bus_id eq $own_bus->bus_id) {
63             # Already connected to our own bus
64 0           $self->_collect_log($own_bus);
65 0           next;
66             }
67              
68 0           my $bus; $bus = Beekeeper::MQTT->new(
69             %$config,
70             bus_id => $bus_id,
71             timeout => 300,
72             on_error => sub {
73             # Reconnect
74 0   0 0     my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
  0            
75 0           log_error "Connection to $bus_id failed: $errmsg";
76 0           my $delay = $self->{connect_err}->{$bus_id}++;
77             $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
78             after => ($delay < 10 ? $delay * 3 : 30),
79             cb => sub {
80             $bus->connect(
81             on_connack => sub {
82             # Setup subscriptions
83 0           log_warn "Reconnected to $bus_id";
84 0           $self->_collect_log($bus);
85             }
86 0           );
87             },
88 0 0         );
89             },
90 0           );
91              
92 0           push @{$self->{_BUS_GROUP}}, $bus;
  0            
93              
94             $bus->connect(
95             on_connack => sub {
96             # Setup subscriptions
97 0     0     $self->_collect_log($bus);
98             }
99 0           );
100             }
101             }
102              
103             sub _collect_log {
104 0     0     my ($self, $bus) = @_;
105              
106             # Default logger logs to topic log/$level/$service
107              
108 0           my $max_entries = $self->{max_entries};
109 0           my $log_level = $self->{log_level};
110 0           my $worker = $self->{_WORKER};
111              
112 0           foreach my $level (1..$log_level) {
113              
114 0           my $topic = "log/$level/#";
115 0           my $req;
116              
117             $bus->subscribe(
118             topic => $topic,
119             on_publish => sub {
120             # my ($payload_ref, $mqtt_properties) = @_;
121              
122 0     0     $req = decode_json( ${$_[0]} );
  0            
123              
124 0           push @Log_buffer, $req->{params};
125              
126 0 0         shift @Log_buffer if (@Log_buffer > $max_entries);
127              
128             # Track number of collected log entries
129 0           $worker->{notif_count}++;
130             },
131             on_suback => sub {
132 0     0     my ($success, $prop) = @_;
133 0 0         die "Could not subscribe to log topic '$topic'" unless $success;
134             },
135 0           );
136             }
137             }
138              
139             sub _ping_backend_brokers {
140 0     0     my $self = shift;
141              
142 0           foreach my $bus (@{$self->{_BUS_GROUP}}) {
  0            
143              
144 0 0         next unless $bus->{is_connected};
145 0           $bus->pingreq;
146             }
147             }
148              
149             sub on_shutdown {
150 0     0 1   my ($self, %args) = @_;
151              
152 0           foreach my $bus (@{$self->{_BUS_GROUP}}) {
  0            
153              
154 0 0         next unless $bus->{is_connected};
155 0           $bus->disconnect;
156             }
157              
158 0           log_info "Stopped";
159             }
160              
161             sub tail {
162 0     0 0   my ($self, $params) = @_;
163              
164 0           foreach ('count','level','after') {
165 0 0         next unless defined $params->{$_};
166 0 0         unless ($params->{$_} =~ m/^\d+(\.\d+)?$/) {
167 0           die "Invalid parameter $_";
168             }
169             }
170              
171 0           foreach ('host','pool','service','message') {
172 0 0         next unless defined $params->{$_};
173             # Allow simple regexes
174 0 0         unless ($params->{$_} =~ m/^[\w .*+?:,()\-\[\]\\]+$/) {
175 0           die "Invalid parameter $_";
176             }
177             }
178              
179 0   0       my $count = $params->{count} || 10;
180 0           my $after = $params->{after};
181 0           my $level = $params->{level};
182              
183             # This will die when an invalid regex is provided, but that's fine
184 0 0         my $host_re = defined $params->{host} ? qr/$params->{host}/i : undef;
185 0 0         my $pool_re = defined $params->{pool} ? qr/$params->{pool}/i : undef;
186 0 0         my $svc_re = defined $params->{service} ? qr/$params->{service}/i : undef;
187 0 0         my $msg_re = defined $params->{message} ? qr/$params->{message}/i : undef;
188              
189 0           my ($entry, @filtered);
190              
191 0           for (my $i = @Log_buffer - 1; $i >= 0; $i--) {
192              
193 0           $entry = $Log_buffer[$i];
194              
195             next if (defined $level && $entry->{level} > $level ) ||
196             (defined $after && $entry->{tstamp} <= $after ) ||
197             (defined $host_re && $entry->{host} !~ $host_re ) ||
198             (defined $pool_re && $entry->{pool} !~ $pool_re ) ||
199             (defined $svc_re && $entry->{service} !~ $svc_re ) ||
200 0 0 0       (defined $msg_re && $entry->{message} !~ $msg_re );
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
201              
202 0           unshift @filtered, $entry;
203              
204 0 0         last if (@filtered >= $count);
205             }
206              
207 0           return \@filtered;
208             }
209              
210             1;
211              
212             __END__