File Coverage

lib/Beekeeper/Worker.pm
Criterion Covered Total %
statement 45 444 10.1
branch 0 158 0.0
condition 0 67 0.0
subroutine 15 61 24.5
pod 9 21 42.8
total 69 751 9.1


line stmt bran cond sub pod time code
1             package Beekeeper::Worker;
2              
3 1     1   1312 use strict;
  1         2  
  1         32  
4 1     1   5 use warnings;
  1         3  
  1         46  
5              
6             our $VERSION = '0.09';
7              
8 1     1   7 use Beekeeper::Client ':worker';
  1         2  
  1         158  
9 1     1   8 use Beekeeper::Logger ':log_levels';
  1         2  
  1         131  
10 1     1   7 use Beekeeper::JSONRPC;
  1         3  
  1         21  
11              
12 1     1   5 use JSON::XS;
  1         1  
  1         42  
13 1     1   5 use Time::HiRes;
  1         2  
  1         7  
14 1     1   68 use Sys::Hostname;
  1         2  
  1         38  
15 1     1   5 use Digest::MD5 'md5_base64';
  1         2  
  1         33  
16 1     1   5 use Scalar::Util 'blessed';
  1         2  
  1         40  
17 1     1   5 use Carp;
  1         2  
  1         45  
18              
19 1     1   5 use constant COMPILE_ERROR_EXIT_CODE => 99;
  1         2  
  1         60  
20 1     1   5 use constant BKPR_REQUEST_AUTHORIZED => int(rand(90000000)+10000000);
  1         2  
  1         50  
21              
22 1     1   5 use Exporter 'import';
  1         2  
  1         2321  
23              
24             our @EXPORT = qw( BKPR_REQUEST_AUTHORIZED );
25              
26             our @EXPORT_OK = qw(
27             log_fatal
28             log_alert
29             log_critical
30             log_error
31             log_warn
32             log_warning
33             log_notice
34             log_info
35             log_debug
36             log_trace
37             log_level
38             );
39              
40             our %EXPORT_TAGS = ('log' => [ @EXPORT_OK, @EXPORT ]);
41              
42             our $Logger = sub { warn(@_) }; # redefined later by __init_logger
43             our $LogLevel = LOG_INFO;
44              
45 0 0   0 0   sub log_fatal (@) { $LogLevel >= LOG_FATAL && $Logger->( LOG_FATAL, @_ ) }
46 0 0   0 0   sub log_alert (@) { $LogLevel >= LOG_ALERT && $Logger->( LOG_ALERT, @_ ) }
47 0 0   0 0   sub log_critical (@) { $LogLevel >= LOG_CRIT && $Logger->( LOG_CRIT, @_ ) }
48 0 0   0 0   sub log_error (@) { $LogLevel >= LOG_ERROR && $Logger->( LOG_ERROR, @_ ) }
49 0 0   0 0   sub log_warn (@) { $LogLevel >= LOG_WARN && $Logger->( LOG_WARN, @_ ) }
50 0 0   0 0   sub log_warning (@) { $LogLevel >= LOG_WARN && $Logger->( LOG_WARN, @_ ) }
51 0 0   0 0   sub log_notice (@) { $LogLevel >= LOG_NOTICE && $Logger->( LOG_NOTICE, @_ ) }
52 0 0   0 0   sub log_info (@) { $LogLevel >= LOG_INFO && $Logger->( LOG_INFO, @_ ) }
53 0 0   0 0   sub log_debug (@) { $LogLevel >= LOG_DEBUG && $Logger->( LOG_DEBUG, @_ ) }
54 0 0   0 0   sub log_trace (@) { $LogLevel >= LOG_TRACE && $Logger->( LOG_TRACE, @_ ) }
55 0 0   0 0   sub log_level (;$) { $LogLevel = shift if scalar @_; return $LogLevel }
  0            
56              
57             our $BUSY_SINCE; *BUSY_SINCE = \$Beekeeper::MQTT::BUSY_SINCE;
58             our $BUSY_TIME; *BUSY_TIME = \$Beekeeper::MQTT::BUSY_TIME;
59              
60             our $REPORT_STATUS_PERIOD = 5;
61             our $UNSUBSCRIBE_LINGER = 2;
62              
63             my %AUTH_TOKENS;
64             my $JSON;
65              
66              
67             sub new {
68 0     0 0   my ($class, %args) = @_;
69              
70             # Parameters passed by WorkerPool->spawn_worker
71            
72 0           my $self = {
73             _WORKER => undef,
74             _CLIENT => undef,
75             _BUS => undef,
76             _LOGGER => undef,
77             };
78              
79 0           bless $self, $class;
80              
81             $self->{_WORKER} = {
82             parent_pid => $args{'parent_pid'},
83             foreground => $args{'foreground'}, # --foreground option
84             debug => $args{'debug'}, # --debug option
85             bus_config => $args{'bus_config'}, # content of bus.config.json
86             pool_config => $args{'pool_config'}, # content of pool.config.json
87             pool_id => $args{'pool_id'},
88             bus_id => $args{'bus_id'},
89 0           config => $args{'config'},
90             hostname => hostname(),
91             stop_cv => undef,
92             callbacks => {},
93             task_queue_high => [],
94             task_queue_low => [],
95             queued_tasks => 0,
96             in_progress => 0,
97             last_report => 0,
98             call_count => 0,
99             notif_count => 0,
100             error_count => 0,
101             busy_time => 0,
102             };
103              
104 0           $JSON = JSON::XS->new;
105 0           $JSON->utf8; # encode result as utf8
106 0           $JSON->allow_blessed; # encode blessed references as null
107 0           $JSON->convert_blessed; # use TO_JSON methods to serialize objects
108              
109 0 0 0       if (defined $SIG{TERM} && $SIG{TERM} eq 'DEFAULT') {
110             # Stop working gracefully when TERM signal is received
111 0     0     $SIG{TERM} = sub { $self->stop_working };
  0            
112             }
113              
114 0 0 0       if (defined $SIG{INT} && $SIG{INT} eq 'DEFAULT' && $args{'foreground'}) {
      0        
115             # In foreground mode also stop working gracefully when INT signal is received
116 0     0     $SIG{INT} = sub { $self->stop_working };
  0            
117             }
118              
119 0           eval {
120              
121             # Init logger as soon as possible
122 0           $self->__init_logger;
123              
124             # Connect to broker
125 0           $self->__init_client;
126              
127             # Pass broker connection to logger
128 0 0         $self->{_LOGGER}->{_BUS} = $self->{_BUS} if (exists $self->{_LOGGER}->{_BUS});
129              
130 0           $self->__init_auth_tokens;
131              
132 0           $self->__init_worker;
133             };
134              
135 0 0         if ($@) {
136 0           log_fatal "Worker died while initialization: $@";
137 0           log_fatal "$class could not be started";
138 0           CORE::exit( COMPILE_ERROR_EXIT_CODE );
139             }
140              
141 0           return $self;
142             }
143              
144             sub __init_auth_tokens {
145 0     0     my ($self) = @_;
146              
147             # Using a hashing function makes harder to access the wrong worker pool by mistake,
148             # but it is not an effective access restriction: anyone with access to the backend
149             # bus credentials can easily inspect and clone auth data tokens
150              
151 0           my $salt = $self->{_CLIENT}->{auth_salt};
152              
153 0           $AUTH_TOKENS{'BKPR_SYSTEM'} = md5_base64('BKPR_SYSTEM'. $salt);
154 0           $AUTH_TOKENS{'BKPR_ADMIN'} = md5_base64('BKPR_ADMIN' . $salt);
155 0           $AUTH_TOKENS{'BKPR_ROUTER'} = md5_base64('BKPR_ROUTER'. $salt);
156             }
157              
158             sub __has_authorization_token {
159 0     0     my ($self, $auth_level) = @_;
160              
161 0           my $auth_data = $self->{_CLIENT}->{auth_data};
162              
163 0 0 0       return 0 unless $auth_data && $auth_level;
164 0 0         return 0 unless exists $AUTH_TOKENS{$auth_level};
165 0 0         return 0 unless $AUTH_TOKENS{$auth_level} eq $auth_data;
166              
167 0           return 1;
168             }
169              
170             sub __init_logger {
171 0     0     my $self = shift;
172              
173             # Honor --debug command line option and 'debug' config option from pool.config.json
174 0 0 0       $LogLevel = LOG_DEBUG if $self->{_WORKER}->{debug} || $self->{_WORKER}->{config}->{debug};
175              
176 0           my $log_handler = $self->log_handler;
177 0           $self->{_LOGGER} = $log_handler;
178              
179             $Logger = sub {
180             # ($level, @messages) = @_
181 0     0     $log_handler->log(@_);
182 0           };
183              
184 0     0     $SIG{__WARN__} = sub { $Logger->( LOG_WARN, @_ ) };
  0            
185             }
186              
187             sub log_handler {
188 0     0 1   my $self = shift;
189              
190             Beekeeper::Logger->new(
191             worker_class => ref $self,
192             foreground => $self->{_WORKER}->{foreground},
193             log_file => $self->{_WORKER}->{config}->{log_file},
194             host => $self->{_WORKER}->{hostname},
195             pool => $self->{_WORKER}->{pool_id},
196             _BUS => $self->{_BUS},
197             @_
198 0           );
199             }
200              
201             sub __init_client {
202 0     0     my $self = shift;
203              
204 0           my $bus_id = $self->{_WORKER}->{bus_id};
205 0           my $config = $self->{_WORKER}->{bus_config}->{$bus_id};
206              
207             my $client = Beekeeper::Client->new(
208             %$config,
209             timeout => 0, # retry forever
210             on_error => sub {
211 0   0 0     my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
  0            
212 0           log_fatal "Connection to $bus_id failed: $errmsg";
213 0           $self->stop_working;
214             },
215 0           );
216              
217 0           $self->{_CLIENT} = $client->{_CLIENT};
218 0           $self->{_BUS} = $client->{_BUS};
219              
220 0           $Beekeeper::Client::singleton = $self;
221             }
222              
223             sub __init_worker {
224 0     0     my $self = shift;
225              
226 0           $self->on_startup;
227              
228 0           $self->__report_status;
229              
230 0           AnyEvent->now_update;
231              
232             $self->{_WORKER}->{report_status_timer} = AnyEvent->timer(
233             after => rand( $REPORT_STATUS_PERIOD ),
234             interval => $REPORT_STATUS_PERIOD,
235 0     0     cb => sub { $self->__report_status },
236 0           );
237             }
238              
239              
240             sub on_startup {
241             # Placeholder, intended to be overrided
242 0     0 1   my $class = ref $_[0];
243 0           log_fatal "Worker class $class doesn't define on_startup() method";
244             }
245              
246       0 1   sub on_shutdown {
247             # Placeholder, can be overrided
248             }
249              
250             sub authorize_request {
251             # Placeholder, must to be overrided
252 0     0 1   my $class = ref $_[0];
253 0           log_fatal "Worker class $class doesn't define authorize_request() method";
254 0           return undef; # do not authorize
255             }
256              
257              
258             sub accept_notifications {
259 0     0 1   my ($self, %args) = @_;
260              
261 0           my $worker = $self->{_WORKER};
262 0           my $callbacks = $worker->{callbacks};
263              
264 0           my ($file, $line) = (caller)[1,2];
265 0           my $at = "at $file line $line\n";
266              
267 0           foreach my $fq_meth (keys %args) {
268              
269 0 0         $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
270             \. ( [\w-]+ | \* ) $/x or die "Invalid notification method '$fq_meth' $at";
271              
272 0           my ($service, $method) = ($1, $2);
273              
274 0           my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});
275              
276 0 0         die "Already accepting notifications '$fq_meth' $at" if exists $callbacks->{"msg.$fq_meth"};
277 0           $callbacks->{"msg.$fq_meth"} = $callback;
278              
279 0           my $local_bus = $self->{_BUS}->{bus_role};
280              
281 0           my $topic = "msg/$local_bus/$service/$method";
282 0           $topic =~ tr|.*|/#|;
283              
284             $self->{_BUS}->subscribe(
285             topic => $topic,
286             on_publish => sub {
287             # ($payload_ref, $properties) = @_;
288              
289             # Enqueue notification
290 0     0     push @{$worker->{task_queue_high}}, [ @_ ];
  0            
291              
292 0 0         unless ($worker->{queued_tasks}) {
293 0           $worker->{queued_tasks} = 1;
294 0           AnyEvent::postpone { $self->__drain_task_queue };
  0            
295             }
296             },
297             on_suback => sub {
298 0     0     my ($success, $prop) = @_;
299 0 0         die "Could not subscribe to topic '$topic' $at" unless $success;
300             }
301 0           );
302             }
303             }
304              
305             sub __get_cb_coderef {
306 0     0     my ($self, $method, $callback) = @_;
307              
308 0 0 0       if (ref $callback eq 'CODE') {
    0          
309             # Already a coderef
310 0           return $callback;
311             }
312             elsif (!ref($callback) && $self->can($callback)) {
313             # Return a reference to given method
314 1     1   10 no strict 'refs';
  1         9  
  1         4314  
315 0           my $class = ref $self;
316 0           return \&{"${class}::${callback}"};
  0            
317             }
318             else {
319 0           my ($file, $line) = (caller(1))[1,2];
320 0           my $at = "at $file line $line\n";
321 0           die "Invalid handler '$callback' for '$method' $at";
322             }
323             }
324              
325              
326             sub accept_remote_calls {
327 0     0 1   my ($self, %args) = @_;
328              
329 0           my $worker = $self->{_WORKER};
330 0           my $callbacks = $worker->{callbacks};
331 0           my %subscribed_to;
332              
333 0           my ($file, $line) = (caller)[1,2];
334 0           my $at = "at $file line $line\n";
335              
336 0           foreach my $fq_meth (keys %args) {
337              
338 0 0         $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
339             \. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";
340              
341 0           my ($service, $method) = ($1, $2);
342              
343 0           my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});
344              
345 0 0         die "Already accepting remote calls '$fq_meth' $at" if exists $callbacks->{"req.$fq_meth"};
346 0           $callbacks->{"req.$fq_meth"} = $callback;
347              
348 0 0         next if $subscribed_to{$service};
349 0           $subscribed_to{$service} = 1;
350              
351 0 0         if (keys %subscribed_to == 2) {
352 0           log_warn "Running multiple services within a single worker hurts load balancing $at";
353             }
354              
355 0           my $local_bus = $self->{_BUS}->{bus_role};
356              
357 0           my $topic = "\$share/BKPR/req/$local_bus/$service";
358 0           $topic =~ tr|.*|/#|;
359              
360             $self->{_BUS}->subscribe(
361             topic => $topic,
362             maximum_qos => 1,
363             on_publish => sub {
364             # ($payload_ref, $mqtt_properties) = @_;
365              
366             # Enqueue request
367 0     0     push @{$worker->{task_queue_low}}, [ @_ ];
  0            
368              
369 0 0         unless ($worker->{queued_tasks}) {
370 0           $worker->{queued_tasks} = 1;
371 0           AnyEvent::postpone { $self->__drain_task_queue };
  0            
372             }
373             },
374             on_suback => sub {
375 0     0     my ($success, $prop) = @_;
376 0 0         die "Could not subscribe to topic '$topic' $at" unless $success;
377             }
378 0           );
379             }
380             }
381              
382             my $_TASK_QUEUE_DEPTH = 0;
383              
384             sub __drain_task_queue {
385 0     0     my $self = shift;
386              
387             # Ensure that draining does not recurse
388 0 0         Carp::confess "Unexpected task queue processing recursion" if $_TASK_QUEUE_DEPTH;
389 0           $_TASK_QUEUE_DEPTH++;
390              
391 0           my $timing_tasks;
392              
393 0 0         unless (defined $BUSY_SINCE) {
394             # Measure time elapsed while processing requests
395 0           $BUSY_SINCE = Time::HiRes::time;
396 0           $timing_tasks = 1;
397             }
398              
399 0           my $worker = $self->{_WORKER};
400 0           my $client = $self->{_CLIENT};
401 0           my $task;
402              
403             # When requests or notifications are received these are not executed immediately
404             # because that could happen in the middle of the process of another request,
405             # so these tasks get queued until the worker is ready to process the next one.
406             #
407             # Callbacks are executed here, exception handling is done here, responses are
408             # sent back here. This is one of the most important methods of the framework.
409             #
410             # Notifications have higher priority and are processed first.
411              
412             DRAIN: {
413              
414 0           while ($task = shift @{$worker->{task_queue_high}}) {
  0            
  0            
415              
416             ## Notification
417              
418 0           my ($payload_ref, $mqtt_properties) = @$task;
419              
420 0           $worker->{notif_count}++;
421              
422 0           eval {
423              
424 0           my $request = decode_json($$payload_ref);
425              
426 0 0 0       unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
427 0           log_error "Received invalid JSON-RPC 2.0 notification";
428 0           return;
429             }
430              
431 0           bless $request, 'Beekeeper::JSONRPC::Notification';
432 0           $request->{_mqtt_properties} = $mqtt_properties;
433              
434 0           my $method = $request->{method};
435              
436 0 0 0       unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
437 0           log_error "Received notification with invalid method '$method'";
438 0           return;
439             }
440              
441             my $cb = $worker->{callbacks}->{"msg.$1.$2"} ||
442 0   0       $worker->{callbacks}->{"msg.$1.*"};
443              
444 0           local $client->{caller_id} = $mqtt_properties->{'clid'};
445 0           local $client->{caller_addr} = $mqtt_properties->{'addr'};
446 0           local $client->{auth_data} = $mqtt_properties->{'auth'};
447              
448 0 0 0       unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
449 0           log_error "Notification '$method' was not authorized";
450 0           return;
451             }
452              
453 0 0         unless ($cb) {
454 0           log_error "No handler found for received notification '$method'";
455 0           return;
456             }
457              
458 0           $cb->($self, $request->{params}, $request);
459             };
460              
461 0 0         if ($@) {
462             # Got an exception while processing message
463 0           log_error $@;
464 0           $worker->{error_count}++;
465             }
466             }
467              
468 0 0         if ($task = shift @{$worker->{task_queue_low}}) {
  0            
469              
470             ## RPC Call
471              
472 0           my ($payload_ref, $mqtt_properties) = @$task;
473              
474 0           $worker->{call_count}++;
475 0           my ($request, $request_id, $result, $response);
476              
477 0           $result = eval {
478              
479 0           $request = decode_json($$payload_ref);
480              
481 0 0 0       unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
482 0           log_error "Received invalid JSON-RPC 2.0 request";
483 0           die Beekeeper::JSONRPC::Error->invalid_request;
484             }
485              
486 0           $request_id = $request->{id};
487 0           my $method = $request->{method};
488              
489 0           bless $request, 'Beekeeper::JSONRPC::Request';
490 0           $request->{_mqtt_properties} = $mqtt_properties;
491              
492 0 0 0       unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
493 0           log_error "Received request with invalid method '$method'";
494 0           die Beekeeper::JSONRPC::Error->method_not_found;
495             }
496              
497             my $cb = $worker->{callbacks}->{"req.$1.$2"} ||
498 0   0       $worker->{callbacks}->{"req.$1.*"};
499              
500 0           local $client->{caller_id} = $mqtt_properties->{'clid'};
501 0           local $client->{caller_addr} = $mqtt_properties->{'addr'};
502 0           local $client->{auth_data} = $mqtt_properties->{'auth'};
503              
504 0 0 0       unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
505 0           log_error "Request '$method' was not authorized";
506 0           die Beekeeper::JSONRPC::Error->request_not_authorized;
507             }
508              
509 0 0         unless ($cb) {
510 0           log_error "No handler found for received request '$method'";
511 0           die Beekeeper::JSONRPC::Error->method_not_found;
512             }
513              
514             # Execute method handler
515 0           $cb->($self, $request->{params}, $request);
516             };
517              
518 0 0 0       if ($@) {
    0          
    0          
519             # Got an exception while executing method handler
520 0 0 0       if (blessed($@) && $@->isa('Beekeeper::JSONRPC::Error')) {
521             # Handled exception
522 0           $response = $@;
523 0           $worker->{error_count}++;
524             }
525             else {
526             # Unhandled exception
527 0           log_error $@;
528 0           $worker->{error_count}++;
529 0           $response = Beekeeper::JSONRPC::Error->server_error;
530             # Sending exact error to caller is very handy, but it is also a security risk
531 0 0         $response->{error}->{data} = $@ if $worker->{debug};
532 0           $worker->{error_count}++;
533             }
534             }
535             elsif (blessed($result) && $result->isa('Beekeeper::JSONRPC::Error')) {
536             # Explicit error response
537 0           $response = $result;
538 0           $worker->{error_count}++;
539             }
540             elsif ($request->{_async_response}) {
541             # Response was deferred and will be sent later
542 0           $worker->{in_progress}++;
543 0           $request->{_worker} = $self;
544             }
545             else {
546             # Build a success response
547 0           $response = {
548             jsonrpc => '2.0',
549             result => $result,
550             };
551             }
552              
553 0 0 0       if (defined $request_id && defined $response) {
554              
555             # Send back response to caller
556              
557 0           $response->{id} = $request_id;
558              
559 0           my $json = eval { $JSON->encode( $response ) };
  0            
560              
561 0 0         if ($@) {
562             # Probably response contains blessed references
563 0           log_error "Couldn't serialize response as JSON: $@";
564 0           $response = Beekeeper::JSONRPC::Error->server_error;
565 0           $response->{id} = $request_id;
566 0           $json = $JSON->encode( $response );
567             }
568              
569             # Request is acknowledged as received just after sending the response. So, if
570             # the process is abruptly interrupted here, the broker will send the request to
571             # another worker and it will be executed twice (acking the request just before
572             # processing it may cause unprocessed requests or undelivered responses)
573              
574             $self->{_BUS}->publish(
575             topic => $mqtt_properties->{'response_topic'},
576 0           addr => $mqtt_properties->{'addr'},
577             payload => \$json,
578             buffer_id => 'response',
579             );
580              
581 0 0         if (exists $mqtt_properties->{'packet_id'}) {
582              
583             $self->{_BUS}->puback(
584 0           packet_id => $mqtt_properties->{'packet_id'},
585             buffer_id => 'response',
586             );
587             }
588             else {
589             # Should not happen (clients must publish with QoS 1)
590 0           log_warn "Request published with QoS 0 to topic " . $mqtt_properties->{'topic'};
591             }
592              
593 0           $self->{_BUS}->flush_buffer( buffer_id => 'response' );
594             }
595             else {
596              
597             # Acknowledge requests that doesn't send a response right now (fire & forget calls
598             # and requests handled asynchronously), signaling the broker to send more requests
599              
600             $self->{_BUS}->puback(
601 0           packet_id => $mqtt_properties->{'packet_id'},
602             );
603             }
604             }
605              
606 0 0 0       redo DRAIN if (@{$worker->{task_queue_high}} || @{$worker->{task_queue_low}});
  0            
  0            
607              
608             # Execute tasks postponed until task queue is empty
609 0 0         if (exists $worker->{postponed}) {
610 0           $_->() foreach @{$worker->{postponed}};
  0            
611 0           delete $worker->{postponed};
612             }
613             }
614              
615 0           $_TASK_QUEUE_DEPTH--;
616              
617 0 0         if (defined $timing_tasks) {
618 0           $BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
619 0           undef $BUSY_SINCE;
620             }
621              
622 0           $worker->{queued_tasks} = 0;
623             }
624              
625             sub __send_response {
626 0     0     my ($self, $request, $result) = @_;
627              
628             # Send back async response to caller
629              
630 0           my ($timing_tasks, $response);
631              
632 0           $self->{_WORKER}->{in_progress}--;
633              
634             # fire & forget calls doesn't expect responses
635 0 0         return unless defined $request->{id};
636              
637 0 0         unless (defined $BUSY_SINCE) {
638 0           $BUSY_SINCE = Time::HiRes::time;
639 0           $timing_tasks = 1;
640             }
641              
642 0 0 0       if (blessed($result) && $result->isa('Beekeeper::JSONRPC::Error')) {
643             # Explicit error response
644 0           $response = $result;
645 0           $self->{_WORKER}->{error_count}++;
646             }
647             else {
648             # Build a success response
649 0           $response = {
650             jsonrpc => '2.0',
651             result => $result,
652             };
653             }
654              
655 0           $response->{id} = $request->{id};
656              
657 0           local $@;
658 0           my $json = eval { $JSON->encode( $response ) };
  0            
659              
660 0 0         if ($@) {
661             # Probably response contains blessed references
662 0           log_error "Couldn't serialize response as JSON: $@";
663 0           $response = Beekeeper::JSONRPC::Error->server_error;
664 0           $response->{id} = $request->{id};
665 0           $json = $JSON->encode( $response );
666 0           $self->{_WORKER}->{error_count}++;
667             }
668              
669             $self->{_BUS}->publish(
670             topic => $request->{_mqtt_properties}->{'response_topic'},
671 0           addr => $request->{_mqtt_properties}->{'addr'},
672             payload => \$json,
673             );
674              
675 0 0         if (defined $timing_tasks) {
676 0           $BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
677 0           undef $BUSY_SINCE;
678             }
679             }
680              
681              
682             sub stop_accepting_notifications {
683 0     0 1   my ($self, @methods) = @_;
684              
685 0           my ($file, $line) = (caller)[1,2];
686 0           my $at = "at $file line $line\n";
687              
688 0 0         die "No method specified $at" unless @methods;
689              
690 0           foreach my $fq_meth (@methods) {
691              
692 0 0         $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
693             \. ( [\w-]+ | \* ) $/x or die "Invalid method '$fq_meth' $at";
694              
695 0           my ($service, $method) = ($1, $2);
696              
697 0           my $worker = $self->{_WORKER};
698              
699 0 0         unless (defined $worker->{callbacks}->{"msg.$fq_meth"}) {
700 0           log_warn "Not previously accepting notifications '$fq_meth' $at";
701 0           next;
702             }
703              
704 0           my $local_bus = $self->{_BUS}->{bus_role};
705              
706 0           my $topic = "msg/$local_bus/$service/$method";
707 0           $topic =~ tr|.*|/#|;
708              
709             # Cannot remove callbacks right now, as new notifications could be in flight or be
710             # already queued. We must wait for unsubscription completion, and then until the
711             # notification queue is empty to ensure that all received ones were processed. And
712             # even then wait a bit more, as some brokers may send messages *after* unsubscription.
713             my $postpone = sub {
714              
715 0     0     my $unsub_tmr; $unsub_tmr = AnyEvent->timer(
716             after => $UNSUBSCRIBE_LINGER, cb => sub {
717              
718 0           delete $worker->{callbacks}->{"msg.$fq_meth"};
719 0           undef $unsub_tmr;
720             }
721 0           );
722 0           };
723              
724             $self->{_BUS}->unsubscribe(
725             topic => $topic,
726             on_unsuback => sub {
727 0     0     my ($success, $prop) = @_;
728              
729 0 0         log_error "Could not unsubscribe from topic '$topic' $at" unless $success;
730              
731 0   0       my $postponed = $worker->{postponed} ||= [];
732 0           push @$postponed, $postpone;
733              
734 0           AnyEvent::postpone { $self->__drain_task_queue };
  0            
735             }
736 0           );
737             }
738             }
739              
740              
741             sub stop_accepting_calls {
742 0     0 1   my ($self, @methods) = @_;
743              
744 0           my ($file, $line) = (caller)[1,2];
745 0           my $at = "at $file line $line\n";
746              
747 0 0         die "No method specified $at" unless @methods;
748              
749 0           foreach my $fq_meth (@methods) {
750              
751 0 0         $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
752             \. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";
753              
754 0           my ($service, $method) = ($1, $2);
755              
756 0 0         unless ($method eq '*') {
757             # Known limitation. As all calls for an entire service class are received
758             # through a single MQTT subscription (in order to load balance them), it is
759             # not possible to reject a single method. A workaround is to use a different
760             # class for each method that need to be individually rejected.
761 0           die "Cannot stop accepting individual methods, only '$service.*' is allowed $at";
762             }
763              
764 0           my $worker = $self->{_WORKER};
765 0           my $callbacks = $worker->{callbacks};
766              
767 0           my @cb_keys = grep { $_ =~ m/^req.\Q$service\E\b/ } keys %$callbacks;
  0            
768              
769 0 0         unless (@cb_keys) {
770 0           log_warn "Not previously accepting remote calls '$fq_meth' $at";
771 0           next;
772             }
773              
774 0           my $local_bus = $self->{_BUS}->{bus_role};
775              
776 0           my $topic = "\$share/BKPR/req/$local_bus/$service";
777 0           $topic =~ tr|.*|/#|;
778              
779             # Cannot remove callbacks right now, as new requests could be in flight or be already
780             # queued. We must wait for unsubscription completion, and then until the task queue
781             # is empty to ensure that all received requests were processed. And even then wait a
782             # bit more, as some brokers may send requests *after* unsubscription.
783             my $postpone = sub {
784              
785 0     0     $worker->{stop_cv}->begin;
786              
787 0           my $unsub_tmr; $unsub_tmr = AnyEvent->timer(
788             after => $UNSUBSCRIBE_LINGER, cb => sub {
789              
790 0           delete $worker->{callbacks}->{$_} foreach @cb_keys;
791 0           delete $worker->{subscriptions}->{$service};
792 0           undef $unsub_tmr;
793              
794 0 0         return unless $worker->{shutting_down};
795              
796 0 0         if ($worker->{in_progress} > 0) {
797              
798             # The task queue is empty now, but an asynchronous method handler is
799             # still busy processing some requests received previously. Wait for
800             # these requests to be completed before telling _work_forever to stop
801              
802 0           my $wait_time = 60;
803 0           $worker->{stop_cv}->begin;
804              
805 0           my $busy_tmr; $busy_tmr = AnyEvent->timer( after => 1, interval => 1, cb => sub {
806 0 0 0       unless ($worker->{in_progress} > 0 && --$wait_time > 0) {
807 0           undef $busy_tmr;
808 0           $worker->{stop_cv}->end;
809             }
810 0           });
811             }
812              
813             # Tell _work_forever to stop
814 0           $worker->{stop_cv}->end;
815             }
816 0           );
817 0           };
818              
819             $self->{_BUS}->unsubscribe(
820             topic => $topic,
821             on_unsuback => sub {
822 0     0     my ($success, $prop) = @_;
823              
824 0 0         log_error "Could not unsubscribe from topic '$topic' $at" unless $success;
825              
826 0   0       my $postponed = $worker->{postponed} ||= [];
827 0           push @$postponed, $postpone;
828              
829 0           AnyEvent::postpone { $self->__drain_task_queue };
  0            
830             }
831 0           );
832             }
833             }
834              
835              
836             sub __work_forever {
837 0     0     my $self = shift;
838              
839             # Called by WorkerPool->spawn_worker
840              
841 0           eval {
842              
843 0           my $worker = $self->{_WORKER};
844              
845 0           $worker->{stop_cv} = AnyEvent->condvar;
846              
847             # Blocks here until stop_working is called
848 0           $worker->{stop_cv}->recv;
849              
850 0           $self->on_shutdown;
851              
852 0           $self->__report_exit;
853             };
854              
855 0 0         if ($@) {
856 0           log_fatal "Worker died: $@";
857 0           CORE::exit(255);
858             }
859              
860 0 0         if ($self->{_BUS}->{is_connected}) {
861 0           $self->{_BUS}->disconnect;
862             }
863             }
864              
865              
866             sub stop_working {
867 0     0 1   my ($self, %args) = @_;
868              
869 0           my $worker = $self->{_WORKER};
870              
871             # This is the default handler for TERM signal
872              
873 0 0         return if $worker->{shutting_down};
874 0           $worker->{shutting_down} = 1;
875              
876 0 0         unless (defined $worker->{stop_cv}) {
877             # Worker did not completed initialization yet
878 0           CORE::exit(0);
879             }
880              
881 0           my %services;
882 0           foreach my $fq_meth (keys %{$worker->{callbacks}}) {
  0            
883 0 0         next unless $fq_meth =~ m/^req\.(?!_sync)(.*)\./;
884 0           $services{$1} = 1;
885             }
886              
887 0 0         if (keys %services) {
888              
889             # Cannot exit right now, as some requests could be in flight or already queued.
890             # So tell the broker to stop sending requests, and exit after the task queue is empty
891 0           foreach my $service (keys %services) {
892              
893 0           $self->stop_accepting_calls( $service . '.*' );
894             }
895             }
896             else {
897             # Tell _work_forever to stop
898 0           $worker->{stop_cv}->send;
899             }
900             }
901              
902              
903             sub __report_status {
904 0     0     my $self = shift;
905              
906 0           my $worker = $self->{_WORKER};
907 0           my $client = $self->{_CLIENT};
908              
909 0           my $now = Time::HiRes::time;
910 0   0       my $period = $now - ($worker->{last_report} || ($now - 1));
911              
912 0           $worker->{last_report} = $now;
913              
914             # Average calls per second
915 0           my $cps = sprintf("%.2f", $worker->{call_count} / $period);
916 0           $worker->{call_count} = 0;
917              
918             # Average notifications per second
919 0           my $nps = sprintf("%.2f", $worker->{notif_count} / $period);
920 0           $worker->{notif_count} = 0;
921              
922             # Average errors per second
923 0           my $err = sprintf("%.2f", $worker->{error_count} / $period);
924 0           $worker->{error_count} = 0;
925              
926             # Average load as percentage of wall clock busy time (not cpu usage)
927 0           my $load = sprintf("%.2f", ($BUSY_TIME - $worker->{busy_time}) / $period * 100);
928 0           $worker->{busy_time} = $BUSY_TIME;
929              
930             # Queues
931 0           my %queues;
932 0           foreach my $queue (keys %{$worker->{callbacks}}) {
  0            
933 0 0         next unless $queue =~ m/^req\.(?!_sync)(.*)\./;
934 0           $queues{$1} = 1;
935             }
936              
937 0           local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
938 0           local $client->{caller_id};
939              
940             # Tell any supervisor our stats
941             $self->fire_remote(
942             method => '_bkpr.supervisor.worker_status',
943             params => {
944             class => ref($self),
945             host => $worker->{hostname},
946             pool => $worker->{pool_id},
947 0           pid => $$,
948             cps => $cps,
949             nps => $nps,
950             err => $err,
951             load => $load,
952             queue => [ keys %queues ],
953             },
954             );
955             }
956              
957             sub __report_exit {
958 0     0     my $self = shift;
959              
960 0 0         return unless $self->{_BUS}->{is_connected};
961              
962 0           my $worker = $self->{_WORKER};
963 0           my $client = $self->{_CLIENT};
964              
965 0           local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
966 0           local $client->{caller_id};
967              
968             $self->fire_remote(
969             method => '_bkpr.supervisor.worker_exit',
970             params => {
971             class => ref($self),
972             host => $worker->{hostname},
973             pool => $worker->{pool_id},
974 0           pid => $$,
975             },
976             );
977             }
978              
979             1;
980              
981             __END__