File Coverage

lib/Beekeeper/Worker.pm
Criterion Covered Total %
statement 48 460 10.4
branch 0 162 0.0
condition 0 73 0.0
subroutine 16 62 25.8
pod 9 21 42.8
total 73 778 9.3


line stmt bran cond sub pod time code
1             package Beekeeper::Worker;
2              
3 1     1   1155 use strict;
  1         2  
  1         31  
4 1     1   4 use warnings;
  1         2  
  1         42  
5              
6             our $VERSION = '0.10';
7              
8 1     1   6 use Beekeeper::Client ':worker';
  1         1  
  1         170  
9 1     1   8 use Beekeeper::Logger ':log_levels';
  1         2  
  1         123  
10 1     1   7 use Beekeeper::JSONRPC;
  1         2  
  1         20  
11              
12 1     1   4 use JSON::XS;
  1         2  
  1         42  
13 1     1   6 use Time::HiRes ();
  1         2  
  1         12  
14 1     1   493 use Sys::Hostname ();
  1         1060  
  1         23  
15 1     1   6 use Compress::Raw::Zlib ();
  1         2  
  1         18  
16 1     1   5 use Digest::MD5 'md5_base64';
  1         4  
  1         42  
17 1     1   6 use Scalar::Util 'blessed';
  1         2  
  1         40  
18 1     1   5 use Carp;
  1         2  
  1         114  
19              
20 1     1   8 use constant COMPILE_ERROR_EXIT_CODE => 99;
  1         1  
  1         63  
21 1     1   6 use constant BKPR_REQUEST_AUTHORIZED => int(rand(90000000)+10000000);
  1         1  
  1         75  
22              
23 1     1   6 use Exporter 'import';
  1         2  
  1         2430  
24              
25             our @EXPORT = qw( BKPR_REQUEST_AUTHORIZED );
26              
27             our @EXPORT_OK = qw(
28             log_fatal
29             log_alert
30             log_critical
31             log_error
32             log_warn
33             log_warning
34             log_notice
35             log_info
36             log_debug
37             log_trace
38             log_level
39             );
40              
41             our %EXPORT_TAGS = ('log' => [ @EXPORT_OK, @EXPORT ]);
42              
43             our $Logger = sub { warn(@_) }; # redefined later by __init_logger
44             our $LogLevel = LOG_INFO;
45              
46 0 0   0 0   sub log_fatal (@) { $LogLevel >= LOG_FATAL && $Logger->( LOG_FATAL, @_ ) }
47 0 0   0 0   sub log_alert (@) { $LogLevel >= LOG_ALERT && $Logger->( LOG_ALERT, @_ ) }
48 0 0   0 0   sub log_critical (@) { $LogLevel >= LOG_CRIT && $Logger->( LOG_CRIT, @_ ) }
49 0 0   0 0   sub log_error (@) { $LogLevel >= LOG_ERROR && $Logger->( LOG_ERROR, @_ ) }
50 0 0   0 0   sub log_warn (@) { $LogLevel >= LOG_WARN && $Logger->( LOG_WARN, @_ ) }
51 0 0   0 0   sub log_warning (@) { $LogLevel >= LOG_WARN && $Logger->( LOG_WARN, @_ ) }
52 0 0   0 0   sub log_notice (@) { $LogLevel >= LOG_NOTICE && $Logger->( LOG_NOTICE, @_ ) }
53 0 0   0 0   sub log_info (@) { $LogLevel >= LOG_INFO && $Logger->( LOG_INFO, @_ ) }
54 0 0   0 0   sub log_debug (@) { $LogLevel >= LOG_DEBUG && $Logger->( LOG_DEBUG, @_ ) }
55 0 0   0 0   sub log_trace (@) { $LogLevel >= LOG_TRACE && $Logger->( LOG_TRACE, @_ ) }
56 0 0   0 0   sub log_level (;$) { $LogLevel = shift if scalar @_; return $LogLevel }
  0            
57              
58             our $BUSY_SINCE; *BUSY_SINCE = \$Beekeeper::MQTT::BUSY_SINCE;
59             our $BUSY_TIME; *BUSY_TIME = \$Beekeeper::MQTT::BUSY_TIME;
60              
61             our $REPORT_STATUS_PERIOD = 5;
62             our $UNSUBSCRIBE_LINGER = 2;
63              
64             my %AUTH_TOKENS;
65             my $DEFLATE;
66             my $JSON;
67              
68              
69             sub new {
70 0     0 0   my ($class, %args) = @_;
71              
72             # Parameters passed by WorkerPool->spawn_worker
73            
74 0           my $self = {
75             _WORKER => undef,
76             _CLIENT => undef,
77             _BUS => undef,
78             _LOGGER => undef,
79             };
80              
81 0           bless $self, $class;
82              
83             $self->{_WORKER} = {
84             parent_pid => $args{'parent_pid'},
85             foreground => $args{'foreground'}, # --foreground option
86             debug => $args{'debug'}, # --debug option
87             bus_config => $args{'bus_config'}, # content of bus.config.json
88             pool_config => $args{'pool_config'}, # content of pool.config.json
89             pool_id => $args{'pool_id'},
90             bus_id => $args{'bus_id'},
91 0           config => $args{'config'},
92             hostname => Sys::Hostname::hostname(),
93             stop_cv => undef,
94             callbacks => {},
95             task_queue_high => [],
96             task_queue_low => [],
97             queued_tasks => 0,
98             in_progress => 0,
99             last_report => 0,
100             call_count => 0,
101             notif_count => 0,
102             error_count => 0,
103             busy_time => 0,
104             };
105              
106 0           $JSON = JSON::XS->new;
107 0           $JSON->utf8; # encode result as utf8
108 0           $JSON->allow_blessed; # encode blessed references as null
109 0           $JSON->convert_blessed; # use TO_JSON methods to serialize objects
110              
111 0           $DEFLATE = Compress::Raw::Zlib::Deflate->new( -AppendOutput => 1 );
112              
113 0 0 0       if (defined $SIG{TERM} && $SIG{TERM} eq 'DEFAULT') {
114             # Stop working gracefully when TERM signal is received
115 0     0     $SIG{TERM} = sub { $self->stop_working };
  0            
116             }
117              
118 0 0 0       if (defined $SIG{INT} && $SIG{INT} eq 'DEFAULT' && $args{'foreground'}) {
      0        
119             # In foreground mode also stop working gracefully when INT signal is received
120 0     0     $SIG{INT} = sub { $self->stop_working };
  0            
121             }
122              
123 0           eval {
124              
125             # Init logger as soon as possible
126 0           $self->__init_logger;
127              
128             # Connect to broker
129 0           $self->__init_client;
130              
131             # Pass broker connection to logger
132 0 0         $self->{_LOGGER}->{_BUS} = $self->{_BUS} if (exists $self->{_LOGGER}->{_BUS});
133              
134 0           $self->__init_auth_tokens;
135              
136 0           $self->__init_worker;
137             };
138              
139 0 0         if ($@) {
140 0           log_fatal "Worker died while initialization: $@";
141 0           log_fatal "$class could not be started";
142 0           CORE::exit( COMPILE_ERROR_EXIT_CODE );
143             }
144              
145 0           return $self;
146             }
147              
148             sub __init_auth_tokens {
149 0     0     my ($self) = @_;
150              
151             # Using a hashing function makes harder to access the wrong worker pool by mistake,
152             # but it is not an effective access restriction: anyone with access to the backend
153             # bus credentials can easily inspect and clone auth data tokens
154              
155 0           my $salt = $self->{_CLIENT}->{auth_salt};
156              
157 0           $AUTH_TOKENS{'BKPR_SYSTEM'} = md5_base64('BKPR_SYSTEM'. $salt);
158 0           $AUTH_TOKENS{'BKPR_ADMIN'} = md5_base64('BKPR_ADMIN' . $salt);
159 0           $AUTH_TOKENS{'BKPR_ROUTER'} = md5_base64('BKPR_ROUTER'. $salt);
160             }
161              
162             sub __has_authorization_token {
163 0     0     my ($self, $auth_level) = @_;
164              
165 0           my $auth_data = $self->{_CLIENT}->{auth_data};
166              
167 0 0 0       return 0 unless $auth_data && $auth_level;
168 0 0         return 0 unless exists $AUTH_TOKENS{$auth_level};
169 0 0         return 0 unless $AUTH_TOKENS{$auth_level} eq $auth_data;
170              
171 0           return 1;
172             }
173              
174             sub __init_logger {
175 0     0     my $self = shift;
176              
177             # Honor --debug command line option and 'debug' config option from pool.config.json
178 0 0 0       $LogLevel = LOG_DEBUG if $self->{_WORKER}->{debug} || $self->{_WORKER}->{config}->{debug};
179              
180 0           my $log_handler = $self->log_handler;
181 0           $self->{_LOGGER} = $log_handler;
182              
183             $Logger = sub {
184             # ($level, @messages) = @_
185 0     0     $log_handler->log(@_);
186 0           };
187              
188 0     0     $SIG{__WARN__} = sub { $Logger->( LOG_WARN, @_ ) };
  0            
189             }
190              
191             sub log_handler {
192 0     0 1   my $self = shift;
193              
194             Beekeeper::Logger->new(
195             worker_class => ref $self,
196             foreground => $self->{_WORKER}->{foreground},
197             log_file => $self->{_WORKER}->{config}->{log_file},
198             host => $self->{_WORKER}->{hostname},
199             pool => $self->{_WORKER}->{pool_id},
200             _BUS => $self->{_BUS},
201             @_
202 0           );
203             }
204              
205             sub __init_client {
206 0     0     my $self = shift;
207              
208 0           my $bus_id = $self->{_WORKER}->{bus_id};
209 0           my $config = $self->{_WORKER}->{bus_config}->{$bus_id};
210              
211             my $client = Beekeeper::Client->new(
212             %$config,
213             timeout => 0, # retry forever
214             on_error => sub {
215 0   0 0     my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
  0            
216 0           log_fatal "Connection to $bus_id failed: $errmsg";
217 0           $self->stop_working;
218             },
219 0           );
220              
221 0           $self->{_CLIENT} = $client->{_CLIENT};
222 0           $self->{_BUS} = $client->{_BUS};
223              
224 0           $Beekeeper::Client::singleton = $self;
225             }
226              
227             sub __init_worker {
228 0     0     my $self = shift;
229              
230 0           $self->on_startup;
231              
232 0           $self->__report_status;
233              
234 0           AnyEvent->now_update;
235              
236             $self->{_WORKER}->{report_status_timer} = AnyEvent->timer(
237             after => rand( $REPORT_STATUS_PERIOD ),
238             interval => $REPORT_STATUS_PERIOD,
239 0     0     cb => sub { $self->__report_status },
240 0           );
241             }
242              
243              
244             sub on_startup {
245             # Placeholder, intended to be overrided
246 0     0 1   my $class = ref $_[0];
247 0           log_fatal "Worker class $class doesn't define on_startup() method";
248             }
249              
250       0 1   sub on_shutdown {
251             # Placeholder, can be overrided
252             }
253              
254             sub authorize_request {
255             # Placeholder, must to be overrided
256 0     0 1   my $class = ref $_[0];
257 0           log_fatal "Worker class $class doesn't define authorize_request() method";
258 0           return undef; # do not authorize
259             }
260              
261              
262             sub accept_notifications {
263 0     0 1   my ($self, %args) = @_;
264              
265 0           my $worker = $self->{_WORKER};
266 0           my $callbacks = $worker->{callbacks};
267              
268 0           my ($file, $line) = (caller)[1,2];
269 0           my $at = "at $file line $line\n";
270              
271 0           foreach my $fq_meth (keys %args) {
272              
273 0 0         $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
274             \. ( [\w-]+ | \* ) $/x or die "Invalid notification method '$fq_meth' $at";
275              
276 0           my ($service, $method) = ($1, $2);
277              
278 0           my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});
279              
280 0 0         die "Already accepting notifications '$fq_meth' $at" if exists $callbacks->{"msg.$fq_meth"};
281 0           $callbacks->{"msg.$fq_meth"} = $callback;
282              
283 0           my $local_bus = $self->{_BUS}->{bus_role};
284              
285 0           my $topic = "msg/$local_bus/$service/$method";
286 0           $topic =~ tr|.*|/#|;
287              
288             $self->{_BUS}->subscribe(
289             topic => $topic,
290             on_publish => sub {
291             # ($payload_ref, $properties) = @_;
292              
293             # Enqueue notification
294 0     0     push @{$worker->{task_queue_high}}, [ @_ ];
  0            
295              
296 0 0         unless ($worker->{queued_tasks}) {
297 0           $worker->{queued_tasks} = 1;
298 0           AnyEvent::postpone { $self->__drain_task_queue };
  0            
299             }
300             },
301             on_suback => sub {
302 0     0     my ($success, $prop) = @_;
303 0 0         die "Could not subscribe to topic '$topic' $at" unless $success;
304             }
305 0           );
306             }
307             }
308              
309             sub __get_cb_coderef {
310 0     0     my ($self, $method, $callback) = @_;
311              
312 0 0 0       if (ref $callback eq 'CODE') {
    0          
313             # Already a coderef
314 0           return $callback;
315             }
316             elsif (!ref($callback) && $self->can($callback)) {
317             # Return a reference to given method
318 1     1   15 no strict 'refs';
  1         3  
  1         4631  
319 0           my $class = ref $self;
320 0           return \&{"${class}::${callback}"};
  0            
321             }
322             else {
323 0           my ($file, $line) = (caller(1))[1,2];
324 0           my $at = "at $file line $line\n";
325 0           die "Invalid handler '$callback' for '$method' $at";
326             }
327             }
328              
329              
330             sub accept_remote_calls {
331 0     0 1   my ($self, %args) = @_;
332              
333 0           my $worker = $self->{_WORKER};
334 0           my $callbacks = $worker->{callbacks};
335 0           my %subscribed_to;
336              
337 0           my ($file, $line) = (caller)[1,2];
338 0           my $at = "at $file line $line\n";
339              
340 0           foreach my $fq_meth (keys %args) {
341              
342 0 0         $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
343             \. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";
344              
345 0           my ($service, $method) = ($1, $2);
346              
347 0           my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});
348              
349 0 0         die "Already accepting remote calls '$fq_meth' $at" if exists $callbacks->{"req.$fq_meth"};
350 0           $callbacks->{"req.$fq_meth"} = $callback;
351              
352 0 0         next if $subscribed_to{$service};
353 0           $subscribed_to{$service} = 1;
354              
355 0 0         if (keys %subscribed_to == 2) {
356 0           log_warn "Running multiple services within a single worker hurts load balancing $at";
357             }
358              
359 0           my $local_bus = $self->{_BUS}->{bus_role};
360              
361 0           my $topic = "\$share/BKPR/req/$local_bus/$service";
362 0           $topic =~ tr|.*|/#|;
363              
364             $self->{_BUS}->subscribe(
365             topic => $topic,
366             maximum_qos => 1,
367             on_publish => sub {
368             # ($payload_ref, $mqtt_properties) = @_;
369              
370             # Enqueue request
371 0     0     push @{$worker->{task_queue_low}}, [ @_ ];
  0            
372              
373 0 0         unless ($worker->{queued_tasks}) {
374 0           $worker->{queued_tasks} = 1;
375 0           AnyEvent::postpone { $self->__drain_task_queue };
  0            
376             }
377             },
378             on_suback => sub {
379 0     0     my ($success, $prop) = @_;
380 0 0         die "Could not subscribe to topic '$topic' $at" unless $success;
381             }
382 0           );
383             }
384             }
385              
386             my $_TASK_QUEUE_DEPTH = 0;
387              
388             sub __drain_task_queue {
389 0     0     my $self = shift;
390              
391             # Ensure that draining does not recurse
392 0 0         Carp::confess "Unexpected task queue processing recursion" if $_TASK_QUEUE_DEPTH;
393 0           $_TASK_QUEUE_DEPTH++;
394              
395 0           my $timing_tasks;
396              
397 0 0         unless (defined $BUSY_SINCE) {
398             # Measure time elapsed while processing requests
399 0           $BUSY_SINCE = Time::HiRes::time;
400 0           $timing_tasks = 1;
401             }
402              
403 0           my $worker = $self->{_WORKER};
404 0           my $client = $self->{_CLIENT};
405 0           my $task;
406              
407             # When requests or notifications are received these are not executed immediately
408             # because that could happen in the middle of the process of another request,
409             # so these tasks get queued until the worker is ready to process the next one.
410             #
411             # Callbacks are executed here, exception handling is done here, responses are
412             # sent back here. This is one of the most important methods of the framework.
413             #
414             # Notifications have higher priority and are processed first.
415              
416             DRAIN: {
417              
418 0           while ($task = shift @{$worker->{task_queue_high}}) {
  0            
  0            
419              
420             ## Notification
421              
422 0           my ($payload_ref, $mqtt_properties) = @$task;
423              
424 0           $worker->{notif_count}++;
425              
426 0           eval {
427              
428 0           my $request = decode_json($$payload_ref);
429              
430 0 0 0       unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
431 0           log_error "Received invalid JSON-RPC 2.0 notification";
432 0           return;
433             }
434              
435 0           bless $request, 'Beekeeper::JSONRPC::Notification';
436 0           $request->{_mqtt_properties} = $mqtt_properties;
437              
438 0           my $method = $request->{method};
439              
440 0 0 0       unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
441 0           log_error "Received notification with invalid method '$method'";
442 0           return;
443             }
444              
445             my $cb = $worker->{callbacks}->{"msg.$1.$2"} ||
446 0   0       $worker->{callbacks}->{"msg.$1.*"};
447              
448 0           local $client->{caller_id} = $mqtt_properties->{'clid'};
449 0           local $client->{caller_addr} = $mqtt_properties->{'addr'};
450 0           local $client->{auth_data} = $mqtt_properties->{'auth'};
451              
452 0 0 0       unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
453 0           log_error "Notification '$method' was not authorized";
454 0           return;
455             }
456              
457 0 0         unless ($cb) {
458 0           log_error "No handler found for received notification '$method'";
459 0           return;
460             }
461              
462 0           $cb->($self, $request->{params}, $request);
463             };
464              
465 0 0         if ($@) {
466             # Got an exception while processing message
467 0           log_error $@;
468 0           $worker->{error_count}++;
469             }
470             }
471              
472 0 0         if ($task = shift @{$worker->{task_queue_low}}) {
  0            
473              
474             ## RPC Call
475              
476 0           my ($payload_ref, $mqtt_properties) = @$task;
477              
478 0           $worker->{call_count}++;
479 0           my ($request, $request_id, $result, $response);
480              
481 0           $result = eval {
482              
483 0           $request = decode_json($$payload_ref);
484              
485 0 0 0       unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
486 0           log_error "Received invalid JSON-RPC 2.0 request";
487 0           die Beekeeper::JSONRPC::Error->invalid_request;
488             }
489              
490 0           $request_id = $request->{id};
491 0           my $method = $request->{method};
492              
493 0           bless $request, 'Beekeeper::JSONRPC::Request';
494 0           $request->{_mqtt_properties} = $mqtt_properties;
495              
496 0 0 0       unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
497 0           log_error "Received request with invalid method '$method'";
498 0           die Beekeeper::JSONRPC::Error->method_not_found;
499             }
500              
501             my $cb = $worker->{callbacks}->{"req.$1.$2"} ||
502 0   0       $worker->{callbacks}->{"req.$1.*"};
503              
504 0           local $client->{caller_id} = $mqtt_properties->{'clid'};
505 0           local $client->{caller_addr} = $mqtt_properties->{'addr'};
506 0           local $client->{auth_data} = $mqtt_properties->{'auth'};
507              
508 0 0 0       unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
509 0           log_error "Request '$method' was not authorized";
510 0           die Beekeeper::JSONRPC::Error->request_not_authorized;
511             }
512              
513 0 0         unless ($cb) {
514 0           log_error "No handler found for received request '$method'";
515 0           die Beekeeper::JSONRPC::Error->method_not_found;
516             }
517              
518             # Execute method handler
519 0           $cb->($self, $request->{params}, $request);
520             };
521              
522 0 0 0       if ($@) {
    0          
    0          
523             # Got an exception while executing method handler
524 0 0 0       if (blessed($@) && $@->isa('Beekeeper::JSONRPC::Error')) {
525             # Handled exception
526 0           $response = $@;
527 0           $worker->{error_count}++;
528             }
529             else {
530             # Unhandled exception
531 0           log_error $@;
532 0           $worker->{error_count}++;
533 0           $response = Beekeeper::JSONRPC::Error->server_error;
534             # Sending exact error to caller is very handy, but it is also a security risk
535 0 0         $response->{error}->{data} = $@ if $worker->{debug};
536 0           $worker->{error_count}++;
537             }
538             }
539             elsif (blessed($result) && $result->isa('Beekeeper::JSONRPC::Error')) {
540             # Explicit error response
541 0           $response = $result;
542 0           $worker->{error_count}++;
543             }
544             elsif ($request->{_async_response}) {
545             # Response was deferred and will be sent later
546 0           $worker->{in_progress}++;
547 0           $request->{_worker} = $self;
548             }
549             else {
550             # Build a success response
551 0           $response = {
552             jsonrpc => '2.0',
553             result => $result,
554             };
555             }
556              
557 0 0 0       if (defined $request_id && defined $response) {
558              
559             # Send back response to caller
560              
561 0           $response->{id} = $request_id;
562              
563 0           my $json = eval { $JSON->encode( $response ) };
  0            
564              
565 0 0         if ($@) {
566             # Probably response contains blessed references
567 0           log_error "Couldn't serialize response as JSON: $@";
568 0           $response = Beekeeper::JSONRPC::Error->server_error;
569 0           $response->{id} = $request_id;
570 0           $json = $JSON->encode( $response );
571             }
572              
573 0 0 0       if ($request->{_deflate_response} && length($json) > $request->{_deflate_response}) {
574 0           my $compressed_json;
575 0           $DEFLATE->deflate(\$json, $compressed_json);
576 0           $DEFLATE->flush(\$compressed_json);
577 0           $DEFLATE->deflateReset;
578 0           $json = $compressed_json;
579             }
580              
581             # Request is acknowledged as received just after sending the response. So, if
582             # the process is abruptly interrupted here, the broker will send the request to
583             # another worker and it will be executed twice (acking the request just before
584             # processing it may cause unprocessed requests or undelivered responses)
585              
586             $self->{_BUS}->publish(
587             topic => $mqtt_properties->{'response_topic'},
588 0           addr => $mqtt_properties->{'addr'},
589             payload => \$json,
590             buffer_id => 'response',
591             );
592              
593 0 0         if (exists $mqtt_properties->{'packet_id'}) {
594              
595             $self->{_BUS}->puback(
596 0           packet_id => $mqtt_properties->{'packet_id'},
597             buffer_id => 'response',
598             );
599             }
600             else {
601             # Should not happen (clients must publish with QoS 1)
602 0           log_warn "Request published with QoS 0 to topic " . $mqtt_properties->{'topic'};
603             }
604              
605 0           $self->{_BUS}->flush_buffer( buffer_id => 'response' );
606             }
607             else {
608              
609             # Acknowledge requests that doesn't send a response right now (fire & forget calls
610             # and requests handled asynchronously), signaling the broker to send more requests
611              
612             $self->{_BUS}->puback(
613 0           packet_id => $mqtt_properties->{'packet_id'},
614             );
615             }
616             }
617              
618 0 0 0       redo DRAIN if (@{$worker->{task_queue_high}} || @{$worker->{task_queue_low}});
  0            
  0            
619              
620             # Execute tasks postponed until task queue is empty
621 0 0         if (exists $worker->{postponed}) {
622 0           $_->() foreach @{$worker->{postponed}};
  0            
623 0           delete $worker->{postponed};
624             }
625             }
626              
627 0           $_TASK_QUEUE_DEPTH--;
628              
629 0 0         if (defined $timing_tasks) {
630 0           $BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
631 0           undef $BUSY_SINCE;
632             }
633              
634 0           $worker->{queued_tasks} = 0;
635             }
636              
637             sub __send_response {
638 0     0     my ($self, $request, $result) = @_;
639              
640             # Send back async response to caller
641              
642 0           my ($timing_tasks, $response);
643              
644 0           $self->{_WORKER}->{in_progress}--;
645              
646             # fire & forget calls doesn't expect responses
647 0 0         return unless defined $request->{id};
648              
649 0 0         unless (defined $BUSY_SINCE) {
650 0           $BUSY_SINCE = Time::HiRes::time;
651 0           $timing_tasks = 1;
652             }
653              
654 0 0 0       if (blessed($result) && $result->isa('Beekeeper::JSONRPC::Error')) {
655             # Explicit error response
656 0           $response = $result;
657 0           $self->{_WORKER}->{error_count}++;
658             }
659             else {
660             # Build a success response
661 0           $response = {
662             jsonrpc => '2.0',
663             result => $result,
664             };
665             }
666              
667 0           $response->{id} = $request->{id};
668              
669 0           local $@;
670 0           my $json = eval { $JSON->encode( $response ) };
  0            
671              
672 0 0         if ($@) {
673             # Probably response contains blessed references
674 0           log_error "Couldn't serialize response as JSON: $@";
675 0           $response = Beekeeper::JSONRPC::Error->server_error;
676 0           $response->{id} = $request->{id};
677 0           $json = $JSON->encode( $response );
678 0           $self->{_WORKER}->{error_count}++;
679             }
680              
681 0 0 0       if ($request->{_deflate_response} && length($json) > $request->{_deflate_response}) {
682 0           my $compressed_json;
683 0           $DEFLATE->deflate(\$json, $compressed_json);
684 0           $DEFLATE->flush(\$compressed_json);
685 0           $DEFLATE->deflateReset;
686 0           $json = $compressed_json;
687             }
688              
689             $self->{_BUS}->publish(
690             topic => $request->{_mqtt_properties}->{'response_topic'},
691 0           addr => $request->{_mqtt_properties}->{'addr'},
692             payload => \$json,
693             );
694              
695 0 0         if (defined $timing_tasks) {
696 0           $BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
697 0           undef $BUSY_SINCE;
698             }
699             }
700              
701              
702             sub stop_accepting_notifications {
703 0     0 1   my ($self, @methods) = @_;
704              
705 0           my ($file, $line) = (caller)[1,2];
706 0           my $at = "at $file line $line\n";
707              
708 0 0         die "No method specified $at" unless @methods;
709              
710 0           foreach my $fq_meth (@methods) {
711              
712 0 0         $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
713             \. ( [\w-]+ | \* ) $/x or die "Invalid method '$fq_meth' $at";
714              
715 0           my ($service, $method) = ($1, $2);
716              
717 0           my $worker = $self->{_WORKER};
718              
719 0 0         unless (defined $worker->{callbacks}->{"msg.$fq_meth"}) {
720 0           log_warn "Not previously accepting notifications '$fq_meth' $at";
721 0           next;
722             }
723              
724 0           my $local_bus = $self->{_BUS}->{bus_role};
725              
726 0           my $topic = "msg/$local_bus/$service/$method";
727 0           $topic =~ tr|.*|/#|;
728              
729             # Cannot remove callbacks right now, as new notifications could be in flight or be
730             # already queued. We must wait for unsubscription completion, and then until the
731             # notification queue is empty to ensure that all received ones were processed. And
732             # even then wait a bit more, as some brokers may send messages *after* unsubscription.
733             my $postpone = sub {
734              
735 0     0     my $unsub_tmr; $unsub_tmr = AnyEvent->timer(
736             after => $UNSUBSCRIBE_LINGER, cb => sub {
737              
738 0           delete $worker->{callbacks}->{"msg.$fq_meth"};
739 0           undef $unsub_tmr;
740             }
741 0           );
742 0           };
743              
744             $self->{_BUS}->unsubscribe(
745             topic => $topic,
746             on_unsuback => sub {
747 0     0     my ($success, $prop) = @_;
748              
749 0 0         log_error "Could not unsubscribe from topic '$topic' $at" unless $success;
750              
751 0   0       my $postponed = $worker->{postponed} ||= [];
752 0           push @$postponed, $postpone;
753              
754 0           AnyEvent::postpone { $self->__drain_task_queue };
  0            
755             }
756 0           );
757             }
758             }
759              
760              
761             sub stop_accepting_calls {
762 0     0 1   my ($self, @methods) = @_;
763              
764 0           my ($file, $line) = (caller)[1,2];
765 0           my $at = "at $file line $line\n";
766              
767 0 0         die "No method specified $at" unless @methods;
768              
769 0           foreach my $fq_meth (@methods) {
770              
771 0 0         $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
772             \. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";
773              
774 0           my ($service, $method) = ($1, $2);
775              
776 0 0         unless ($method eq '*') {
777             # Known limitation. As all calls for an entire service class are received
778             # through a single MQTT subscription (in order to load balance them), it is
779             # not possible to reject a single method. A workaround is to use a different
780             # class for each method that need to be individually rejected.
781 0           die "Cannot stop accepting individual methods, only '$service.*' is allowed $at";
782             }
783              
784 0           my $worker = $self->{_WORKER};
785 0           my $callbacks = $worker->{callbacks};
786              
787 0           my @cb_keys = grep { $_ =~ m/^req.\Q$service\E\b/ } keys %$callbacks;
  0            
788              
789 0 0         unless (@cb_keys) {
790 0           log_warn "Not previously accepting remote calls '$fq_meth' $at";
791 0           next;
792             }
793              
794 0           my $local_bus = $self->{_BUS}->{bus_role};
795              
796 0           my $topic = "\$share/BKPR/req/$local_bus/$service";
797 0           $topic =~ tr|.*|/#|;
798              
799             # Cannot remove callbacks right now, as new requests could be in flight or be already
800             # queued. We must wait for unsubscription completion, and then until the task queue
801             # is empty to ensure that all received requests were processed. And even then wait a
802             # bit more, as some brokers may send requests *after* unsubscription.
803             my $postpone = sub {
804              
805 0     0     $worker->{stop_cv}->begin;
806              
807 0           my $unsub_tmr; $unsub_tmr = AnyEvent->timer(
808             after => $UNSUBSCRIBE_LINGER, cb => sub {
809              
810 0           delete $worker->{callbacks}->{$_} foreach @cb_keys;
811 0           delete $worker->{subscriptions}->{$service};
812 0           undef $unsub_tmr;
813              
814 0 0         return unless $worker->{shutting_down};
815              
816 0 0         if ($worker->{in_progress} > 0) {
817              
818             # The task queue is empty now, but an asynchronous method handler is
819             # still busy processing some requests received previously. Wait for
820             # these requests to be completed before telling _work_forever to stop
821              
822 0           my $wait_time = 60;
823 0           $worker->{stop_cv}->begin;
824              
825 0           my $busy_tmr; $busy_tmr = AnyEvent->timer( after => 1, interval => 1, cb => sub {
826 0 0 0       unless ($worker->{in_progress} > 0 && --$wait_time > 0) {
827 0           undef $busy_tmr;
828 0           $worker->{stop_cv}->end;
829             }
830 0           });
831             }
832              
833             # Tell _work_forever to stop
834 0           $worker->{stop_cv}->end;
835             }
836 0           );
837 0           };
838              
839             $self->{_BUS}->unsubscribe(
840             topic => $topic,
841             on_unsuback => sub {
842 0     0     my ($success, $prop) = @_;
843              
844 0 0         log_error "Could not unsubscribe from topic '$topic' $at" unless $success;
845              
846 0   0       my $postponed = $worker->{postponed} ||= [];
847 0           push @$postponed, $postpone;
848              
849 0           AnyEvent::postpone { $self->__drain_task_queue };
  0            
850             }
851 0           );
852             }
853             }
854              
855              
856             sub __work_forever {
857 0     0     my $self = shift;
858              
859             # Called by WorkerPool->spawn_worker
860              
861 0           eval {
862              
863 0           my $worker = $self->{_WORKER};
864              
865 0           $worker->{stop_cv} = AnyEvent->condvar;
866              
867             # Blocks here until stop_working is called
868 0           $worker->{stop_cv}->recv;
869              
870 0           $self->on_shutdown;
871              
872 0           $self->__report_exit;
873             };
874              
875 0 0         if ($@) {
876 0           log_fatal "Worker died: $@";
877 0           CORE::exit(255);
878             }
879              
880 0 0         if ($self->{_BUS}->{is_connected}) {
881 0           $self->{_BUS}->disconnect;
882             }
883             }
884              
885              
886             sub stop_working {
887 0     0 1   my ($self, %args) = @_;
888              
889 0           my $worker = $self->{_WORKER};
890              
891             # This is the default handler for TERM signal
892              
893 0 0         return if $worker->{shutting_down};
894 0           $worker->{shutting_down} = 1;
895              
896 0 0         unless (defined $worker->{stop_cv}) {
897             # Worker did not completed initialization yet
898 0           CORE::exit(0);
899             }
900              
901 0           my %services;
902 0           foreach my $fq_meth (keys %{$worker->{callbacks}}) {
  0            
903 0 0         next unless $fq_meth =~ m/^req\.(?!_sync)(.*)\./;
904 0           $services{$1} = 1;
905             }
906              
907 0 0         if (keys %services) {
908              
909             # Cannot exit right now, as some requests could be in flight or already queued.
910             # So tell the broker to stop sending requests, and exit after the task queue is empty
911 0           foreach my $service (keys %services) {
912              
913 0           $self->stop_accepting_calls( $service . '.*' );
914             }
915             }
916             else {
917             # Tell _work_forever to stop
918 0           $worker->{stop_cv}->send;
919             }
920             }
921              
922              
923             sub __report_status {
924 0     0     my $self = shift;
925              
926 0           my $worker = $self->{_WORKER};
927 0           my $client = $self->{_CLIENT};
928              
929 0           my $now = Time::HiRes::time;
930 0   0       my $period = $now - ($worker->{last_report} || ($now - 1));
931              
932 0           $worker->{last_report} = $now;
933              
934             # Average calls per second
935 0           my $cps = sprintf("%.2f", $worker->{call_count} / $period);
936 0           $worker->{call_count} = 0;
937              
938             # Average notifications per second
939 0           my $nps = sprintf("%.2f", $worker->{notif_count} / $period);
940 0           $worker->{notif_count} = 0;
941              
942             # Average errors per second
943 0           my $err = sprintf("%.2f", $worker->{error_count} / $period);
944 0           $worker->{error_count} = 0;
945              
946             # Average load as percentage of wall clock busy time (not cpu usage)
947 0           my $load = sprintf("%.2f", ($BUSY_TIME - $worker->{busy_time}) / $period * 100);
948 0           $worker->{busy_time} = $BUSY_TIME;
949              
950             # Queues
951 0           my %queues;
952 0           foreach my $queue (keys %{$worker->{callbacks}}) {
  0            
953 0 0         next unless $queue =~ m/^req\.(?!_sync)(.*)\./;
954 0           $queues{$1} = 1;
955             }
956              
957 0           local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
958 0           local $client->{caller_id};
959              
960             # Tell any supervisor our stats
961             $self->fire_remote(
962             method => '_bkpr.supervisor.worker_status',
963             params => {
964             class => ref($self),
965             host => $worker->{hostname},
966             pool => $worker->{pool_id},
967 0           pid => $$,
968             cps => $cps,
969             nps => $nps,
970             err => $err,
971             load => $load,
972             queue => [ keys %queues ],
973             },
974             );
975             }
976              
977             sub __report_exit {
978 0     0     my $self = shift;
979              
980 0 0         return unless $self->{_BUS}->{is_connected};
981              
982 0           my $worker = $self->{_WORKER};
983 0           my $client = $self->{_CLIENT};
984              
985 0           local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
986 0           local $client->{caller_id};
987              
988             $self->fire_remote(
989             method => '_bkpr.supervisor.worker_exit',
990             params => {
991             class => ref($self),
992             host => $worker->{hostname},
993             pool => $worker->{pool_id},
994 0           pid => $$,
995             },
996             );
997             }
998              
999             1;
1000              
1001             __END__