File Coverage

lib/Beekeeper/Service/Sinkhole/Worker.pm
Criterion Covered Total %
statement 18 59 30.5
branch 0 16 0.0
condition n/a
subroutine 6 15 40.0
pod 4 7 57.1
total 28 97 28.8


line stmt bran cond sub pod time code
1             package Beekeeper::Service::Sinkhole::Worker;
2              
3 1     1   995 use strict;
  1         2  
  1         29  
4 1     1   5 use warnings;
  1         2  
  1         43  
5              
6             our $VERSION = '0.10';
7              
8 1     1   6 use Beekeeper::Worker ':log';
  1         2  
  1         134  
9 1     1   6 use base 'Beekeeper::Worker';
  1         2  
  1         66  
10              
11 1     1   6 use Beekeeper::JSONRPC::Error;
  1         2  
  1         23  
12 1     1   4 use JSON::XS;
  1         2  
  1         673  
13              
14              
15             sub authorize_request {
16 0     0 1   my ($self, $req) = @_;
17              
18 0 0         if ($req->{method} eq '_bkpr.sinkhole.unserviced_queues') {
19              
20 0 0         return unless $self->__has_authorization_token('BKPR_SYSTEM');
21             }
22              
23             # All requests will be rejected later by reject_request
24 0           return BKPR_REQUEST_AUTHORIZED;
25             }
26              
27             sub on_startup {
28 0     0 1   my $self = shift;
29              
30 0           $self->{Draining} = {};
31              
32 0           $self->accept_notifications(
33             '_bkpr.sinkhole.unserviced_queues' => 'on_unserviced_queues',
34             );
35              
36 0           my $local_bus = $self->{_BUS}->{bus_role};
37              
38             # Watch the Supervisor data traffic in order to stop rejecting
39             # requests as soon as a worker handling these becomes online
40              
41 0           my $topic = "msg/$local_bus/_sync/workers/set";
42              
43             $self->{_BUS}->subscribe(
44             topic => $topic,
45             on_publish => sub {
46 0     0     my ($payload_ref, $properties) = @_;
47 0           $self->on_worker_status( decode_json($$payload_ref)->[1] );
48             },
49             on_suback => sub {
50 0     0     my ($success) = @_;
51 0 0         log_error "Could not subscribe to topic '$topic'" unless $success;
52             }
53 0           );
54              
55 0           log_info "Ready";
56             }
57              
58             sub on_shutdown {
59 0     0 1   my $self = shift;
60              
61 0           log_info "Stopped";
62             }
63              
64             sub log_handler {
65 0     0 1   my $self = shift;
66              
67             # Use pool's logfile
68 0           $self->SUPER::log_handler( foreground => 1 );
69             }
70              
71             sub on_unserviced_queues {
72 0     0 0   my ($self, $params) = @_;
73              
74 0           my $queues = $params->{queues};
75            
76 0           foreach my $queue (@$queues) {
77              
78             # Nothing to do if already draining $queue
79 0 0         next if $self->{Draining}->{$queue};
80              
81             # As no one is processing requests, respond these with errors
82 0           $self->{Draining}->{$queue} = 1;
83              
84 0           my $local_bus = $self->{_BUS}->{bus_role};
85 0           log_error "Draining unserviced req/$local_bus/$queue";
86              
87 0           $self->accept_remote_calls( "$queue.*" => 'reject_request' );
88             }
89             }
90              
91             sub on_worker_status {
92 0     0 0   my ($self, $status) = @_;
93              
94 0 0         return unless ($status->{queue});
95              
96 0 0         return if ($status->{class} eq 'Beekeeper::Service::Sinkhole::Worker');
97              
98 0           foreach my $queue (@{$status->{queue}}) {
  0            
99              
100             # Nothing to do if not draining queue
101 0 0         next unless $self->{Draining}->{$queue};
102              
103             # A worker servicing a previously unserviced queue has just become
104             # online, so do not respond with errors anymore
105 0           delete $self->{Draining}->{$queue};
106              
107 0           my $local_bus = $self->{_BUS}->{bus_role};
108 0           log_warn "Stopped draining req/$local_bus/$queue";
109              
110 0           $self->stop_accepting_calls( "$queue.*" );
111             }
112             }
113              
114             sub reject_request {
115 0     0 0   my ($self, $params, $req) = @_;
116              
117             # Just return a JSONRPC error response
118              
119 0 0         if ($req->mqtt_properties->{'auth'}) {
120             # When client provided some kind of authentication tell him the truth
121             # about the service being down. Otherwise the one trying to fix the
122             # issue may be deceived into looking for auth/permissions problems
123 0           return Beekeeper::JSONRPC::Error->method_not_available;
124             }
125             else {
126 0           return Beekeeper::JSONRPC::Error->request_not_authorized;
127             }
128             }
129              
130             1;
131              
132             __END__