File Coverage

lib/Beekeeper/Service/Router/Worker.pm
Criterion Covered Total %
statement 33 267 12.3
branch 0 66 0.0
condition 0 40 0.0
subroutine 11 38 28.9
pod 3 10 30.0
total 47 421 11.1


line stmt bran cond sub pod time code
1             package Beekeeper::Service::Router::Worker;
2              
3 1     1   1011 use strict;
  1         2  
  1         40  
4 1     1   7 use warnings;
  1         2  
  1         44  
5              
6             our $VERSION = '0.10';
7              
8 1     1   5 use Beekeeper::Worker ':log';
  1         2  
  1         131  
9 1     1   7 use base 'Beekeeper::Worker';
  1         1  
  1         65  
10              
11 1     1   6 use Beekeeper::Worker::Extension::SharedCache;
  1         2  
  1         68  
12 1     1   7 use Scalar::Util 'weaken';
  1         2  
  1         57  
13              
14 1     1   6 use constant FRONTEND_ROLE =>'frontend';
  1         1  
  1         63  
15 1     1   6 use constant SESSION_TIMEOUT => 1800;
  1         2  
  1         45  
16 1     1   5 use constant SHUTDOWN_WAIT => 2;
  1         2  
  1         63  
17 1     1   7 use constant QUEUE_LANES => 2;
  1         1  
  1         57  
18 1     1   7 use constant DEBUG => 0;
  1         2  
  1         3752  
19              
20             $Beekeeper::Worker::LogLevel = 9 if DEBUG;
21              
22              
23             sub authorize_request {
24 0     0 1   my ($self, $req) = @_;
25              
26 0 0         return unless $self->__has_authorization_token('BKPR_ROUTER');
27              
28 0           return BKPR_REQUEST_AUTHORIZED;
29             }
30              
31             sub on_startup {
32 0     0 1   my $self = shift;
33 0           weaken $self;
34              
35 0           my $worker_config = $self->{_WORKER}->{config};
36 0           my $bus_config = $self->{_WORKER}->{bus_config};
37              
38 0   0       $self->{sess_timeout} = $worker_config->{'session_timeout'} || SESSION_TIMEOUT;
39 0   0       $self->{shutdown_wait} = $worker_config->{'shutdown_wait'} || SHUTDOWN_WAIT;
40 0   0       $self->{frontend_role} = $worker_config->{'frontend_role'} || FRONTEND_ROLE;
41              
42 0           $self->_init_routing_table;
43              
44 0           my $frontend_role = $self->{frontend_role};
45 0           my $frontends_config = Beekeeper::Config->get_bus_group_config( bus_role => $frontend_role );
46              
47 0 0         unless (@$frontends_config) {
48 0           die "No bus with role '$frontend_role' was found into config file bus.config.json\n";
49             }
50              
51 0           $self->{wait_frontends_up} = AnyEvent->condvar;
52              
53             # Create a connection to every frontend
54 0           foreach my $config (@$frontends_config) {
55              
56 0           $self->init_frontend_connection( $config );
57             }
58              
59             # Ping frontend brokers to avoid disconnections due to inactivity
60             $self->{ping_timer} = AnyEvent->timer(
61             after => 60 * rand(),
62             interval => 60,
63 0     0     cb => sub { $self->ping_frontend_brokers },
64 0           );
65             }
66              
67             sub init_frontend_connection {
68 0     0 0   my ($self, $config) = @_;
69 0           weaken $self;
70              
71 0           my $bus_id = $config->{'bus_id'};
72 0           my $back_id = $self->{_BUS}->bus_id;
73              
74 0           $self->{wait_frontends_up}->begin;
75              
76 0           my $bus; $bus = Beekeeper::MQTT->new(
77             %$config,
78             bus_id => $bus_id,
79             timeout => 60,
80             on_error => sub {
81             # Reconnect
82 0   0 0     my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
  0            
83 0           log_alert "Connection to $bus_id failed: $errmsg";
84 0           delete $self->{FRONTEND}->{$bus_id};
85 0           $self->{wait_frontends_up}->end;
86 0           my $delay = $self->{connect_err}->{$bus_id}++;
87             $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
88             after => ($delay < 10 ? $delay * 3 : 30),
89             cb => sub {
90             $bus->connect(
91             on_connack => sub {
92             # Setup routing
93 0           log_warn "Rerouting: $back_id <--> $bus_id";
94 0           $self->{FRONTEND}->{$bus_id} = $bus;
95 0           $self->pull_frontend_requests( frontend => $bus );
96             }
97 0           );
98             },
99 0 0         );
100             },
101 0           );
102              
103             $bus->connect(
104             on_connack => sub {
105             # Setup routing
106 0     0     log_info "Routing: $back_id <--> $bus_id";
107 0           $self->{FRONTEND}->{$bus_id} = $bus;
108 0           $self->{wait_frontends_up}->end;
109 0           $self->pull_frontend_requests( frontend => $bus );
110 0           $self->pull_backend_responses( frontend => $bus );
111 0           $self->pull_backend_notifications( frontend => $bus );
112             },
113 0           );
114             }
115              
116             sub on_shutdown {
117 0     0 1   my ($self, %args) = @_;
118              
119 0           log_info "Shutting down";
120              
121 0           my $frontend_role = $self->{frontend_role};
122              
123 0           my $backend_bus = $self->{_BUS};
124 0           my $backend_role = $self->{_BUS}->{bus_role};
125              
126 0           my $cv = AnyEvent->condvar;
127              
128             # 1. Do not pull frontend requests anymore
129 0           foreach my $frontend_bus (values %{$self->{FRONTEND}}) {
  0            
130              
131 0           foreach my $lane (1..QUEUE_LANES) {
132              
133 0           my $topic = "\$share/BKPR/req/$backend_role-$lane";
134 0           $cv->begin;
135             $frontend_bus->unsubscribe(
136             topic => $topic,
137             on_unsuback => sub {
138 0     0     my ($success, $prop) = @_;
139 0 0         log_error "Could not unsubscribe from $topic" unless $success;
140 0           $cv->end;
141             }
142 0           );
143             }
144             }
145              
146             # 2. Stop forwarding notifications to frontend
147 0           foreach my $lane (1..QUEUE_LANES) {
148              
149 0           my $topic = "\$share/BKPR/msg/$frontend_role-$lane";
150 0           $cv->begin;
151             $backend_bus->unsubscribe(
152             topic => $topic,
153             on_unsuback => sub {
154 0     0     my ($success, $prop) = @_;
155 0 0         log_error "Could not unsubscribe from $topic" unless $success;
156 0           $cv->end;
157             }
158 0           );
159             }
160              
161             # 3. Wait for unsubacks, assuring that no more requests or messages are buffered
162 0     0     my $tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
  0            
163 0           $cv->recv;
164              
165             # 4. Just in case of pool full stop, wait for workers to finish their current tasks
166 0           my $wait = AnyEvent->condvar;
167 0     0     $tmr = AnyEvent->timer( after => $self->{shutdown_wait}, cb => sub { $wait->send });
  0            
168 0           $wait->recv;
169              
170 0           $cv = AnyEvent->condvar;
171              
172             # 5. Stop forwarding responses to frontend
173 0           foreach my $frontend_bus (values %{$self->{FRONTEND}}) {
  0            
174              
175 0           my $frontend_id = $frontend_bus->bus_id;
176              
177 0           foreach my $lane (1..QUEUE_LANES) {
178              
179 0           my $topic = "\$share/BKPR/res/$frontend_id-$lane";
180 0           $cv->begin;
181             $backend_bus->unsubscribe(
182             topic => $topic,
183             on_unsuback => sub {
184 0     0     my ($success, $prop) = @_;
185 0 0         log_error "Could not unsubscribe from $topic" unless $success;
186 0           $cv->end;
187             }
188 0           );
189             }
190             }
191              
192             # 6. Wait for unsubacks, assuring that no more responses are buffered
193 0     0     $tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
  0            
194 0           $cv->recv;
195              
196             # Disconnect from all frontends
197 0           my @frontends = values %{$self->{FRONTEND}};
  0            
198 0           foreach my $frontend_bus (@frontends) {
199              
200 0 0         next unless ($frontend_bus->{is_connected});
201 0           $frontend_bus->disconnect;
202             }
203              
204             # Disconnect shared cache
205 0           undef $self->{MqttSessions};
206              
207 0           log_info "Stopped";
208             }
209              
210             sub pull_frontend_requests {
211 0     0 0   my ($self, %args) = @_;
212 0           weaken $self;
213              
214             # Get requests from frontend bus and forward them to backend bus
215             #
216             # from: req/backend-n @frontend
217             # to: req/backend/{app}/{service} @backend
218              
219 0           my $frontend_bus = $args{frontend};
220 0           my $frontend_id = $frontend_bus->bus_id;
221              
222 0           my $backend_bus = $self->{_BUS};
223 0           my $backend_id = $backend_bus->bus_id;
224 0           my $backend_role = $backend_bus->bus_role;
225              
226 0           foreach my $lane (1..QUEUE_LANES) {
227              
228 0           my $src_queue = "\$share/BKPR/req/$backend_role-$lane";
229              
230 0           my ($payload_ref, $mqtt_properties);
231 0           my ($dest_queue, $reply_to, $caller_id, $mqtt_session);
232 0           my %pub_args;
233              
234             $frontend_bus->subscribe(
235             topic => $src_queue,
236             maximum_qos => 0,
237             on_publish => sub {
238 0     0     ($payload_ref, $mqtt_properties) = @_;
239              
240             # (!) UNTRUSTED REQUEST
241              
242             # eg: req/backend/myapp/service
243 0   0       $dest_queue = $mqtt_properties->{'fwd_to'} || '';
244 0 0         return unless $dest_queue =~ m|^req(?:/(?!_)[\w-]+)+$|;
245              
246             # eg: priv/7nXDsxMDwgLUSedX
247 0   0       $reply_to = $mqtt_properties->{'response_topic'} || '';
248 0 0         return unless $reply_to =~ m|^priv/(\w{16,23})$|;
249 0           $caller_id = $1;
250              
251             #TODO: Extra sanity checks could be done here before forwarding to backend
252              
253 0           %pub_args = (
254             topic => $dest_queue,
255             clid => $caller_id,
256             response_topic => "res/$frontend_id-$lane",
257             addr => "$reply_to\@$frontend_id",
258             payload => $payload_ref,
259             qos => 1, # because workers consume using QoS 1
260             );
261              
262 0           $mqtt_session = $self->{MqttSessions}->get( $caller_id );
263              
264 0 0         if (defined $mqtt_session) {
265 0           $self->{MqttSessions}->touch( $caller_id );
266 0           $pub_args{'auth'} = $mqtt_session->[2];
267             }
268              
269 0           $backend_bus->publish( %pub_args );
270              
271 0           DEBUG && log_trace "Forwarded request: $src_queue \@$frontend_id --> $dest_queue \@$backend_id";
272              
273 0           $self->{_WORKER}->{call_count}++;
274             },
275             on_suback => sub {
276 0     0     log_debug "Forwarding $src_queue \@$frontend_id --> req/$backend_role/{app}/{service} \@$backend_id";
277             }
278 0           );
279             }
280             }
281              
282             sub pull_backend_responses {
283 0     0 0   my ($self, %args) = @_;
284              
285             # Get responses from backend bus and forward them to frontend bus
286             #
287             # from: res/frontend-n @backend
288             # to: priv/{session_id} @frontend
289              
290 0           my $frontend_bus = $args{frontend};
291 0           my $frontend_id = $frontend_bus->bus_id;
292              
293 0           my $backend_bus = $self->{_BUS};
294 0           my $backend_id = $backend_bus->bus_id;
295              
296 0           foreach my $lane (1..QUEUE_LANES) {
297              
298 0           my $src_queue = "\$share/BKPR/res/$frontend_id-$lane";
299              
300 0           my ($payload_ref, $mqtt_properties, $dest_queue);
301              
302             $backend_bus->subscribe(
303             topic => $src_queue,
304             maximum_qos => 0,
305             on_publish => sub {
306 0     0     ($payload_ref, $mqtt_properties) = @_;
307              
308 0           ($dest_queue) = split('@', $mqtt_properties->{'addr'}, 2);
309              
310 0           $frontend_bus->publish(
311             topic => $dest_queue,
312             payload => $payload_ref,
313             );
314              
315 0           DEBUG && log_trace "Forwarded response: $src_queue \@$backend_id --> $dest_queue \@$frontend_id";
316             },
317             on_suback => sub {
318 0     0     log_debug "Forwarding $src_queue \@$backend_id --> priv/{session_id} \@$frontend_id";
319             }
320 0           );
321             }
322             }
323              
324             sub pull_backend_notifications {
325 0     0 0   my ($self, %args) = @_;
326 0           weaken($self);
327              
328             # Get notifications from backend bus and broadcast them to all frontend buses
329             #
330             # from: msg/frontend-n @backend
331             # to: msg/frontend/{app}/{service}/{method} @frontend
332              
333 0 0 0       unless (keys %{$self->{FRONTEND}} && $self->{wait_frontends_up}->ready) {
  0            
334             # Wait until connected to all (working) frontends before pulling
335             # notifications otherwise messages cannot be broadcasted properly
336             #TODO: MQTT: broker will discard messages unless someone subscribes
337 0           return;
338             }
339              
340 0           my $frontend_bus = $args{frontend};
341 0           my $frontend_id = $frontend_bus->bus_id;
342              
343 0           my $backend_bus = $self->{_BUS};
344 0           my $backend_id = $backend_bus->bus_id;
345              
346 0           my $frontend_role = $self->{frontend_role};
347              
348 0           foreach my $lane (1..QUEUE_LANES) {
349              
350 0           my $src_queue = "\$share/BKPR/msg/$frontend_role-$lane",
351              
352             my ($payload_ref, $mqtt_properties, $destination, $address);
353              
354             $backend_bus->subscribe(
355             topic => $src_queue,
356             maximum_qos => 0,
357             on_publish => sub {
358 0     0     ($payload_ref, $mqtt_properties) = @_;
359              
360 0           ($destination, $address) = split('@', $mqtt_properties->{'fwd_to'}, 2);
361              
362 0 0         if (defined $address) {
363              
364             # Unicast
365 0   0       my $dest_queues = $self->{Addr_to_topics}->{$address} || return;
366              
367 0           foreach my $queue (@$dest_queues) {
368              
369 0           my ($destination, $bus_id) = split('@', $queue, 2);
370              
371 0   0       my $frontend_bus = $self->{FRONTEND}->{$bus_id} || next;
372              
373 0           $frontend_bus->publish(
374             topic => $destination,
375             payload => $payload_ref,
376             );
377              
378 0           DEBUG && log_trace "Forwarded notific: $src_queue \@$backend_id --> $destination \@$frontend_id";
379             }
380             }
381             else {
382              
383             # Broadcast
384 0           foreach my $frontend_bus (values %{$self->{FRONTEND}}) {
  0            
385              
386 0           $frontend_bus->publish(
387             topic => $destination,
388             payload => $payload_ref,
389             );
390              
391 0           DEBUG && log_trace "Forwarded notific: $src_queue \@$backend_id --> $destination \@$frontend_id";
392             }
393             }
394              
395 0           $self->{_WORKER}->{notif_count}++;
396             },
397             on_suback => sub {
398 0     0     log_debug "Forwarding $src_queue \@$backend_id --> msg/frontend/{app}/{service}/{method} \@$frontend_id";
399             }
400 0           );
401             }
402             }
403              
404             sub ping_frontend_brokers {
405 0     0 0   my $self = shift;
406              
407 0           foreach my $frontend_bus (values %{$self->{FRONTEND}}) {
  0            
408              
409 0 0         next unless $frontend_bus->{is_connected};
410 0           $frontend_bus->pingreq;
411             }
412             }
413              
414             sub _init_routing_table {
415 0     0     my $self = shift;
416              
417 0           $self->{Addr_to_topics} = {};
418 0           $self->{Addr_to_sessions} = {};
419              
420             $self->{MqttSessions} = $self->shared_cache(
421             id => "router",
422             persist => 1,
423             max_age => $self->{sess_timeout},
424             on_update => sub {
425 0     0     my ($caller_id, $value, $old_value) = @_;
426              
427             # Keep indexes: address -> [ caller_addr, ... ]
428             # address -> [ caller_id, ... ]
429              
430 0 0         if (defined $value) {
    0          
431             # Bind
432 0           my $addr = $value->[0];
433 0           my $topic = $value->[1];
434              
435 0 0         return unless defined $addr;
436              
437 0   0       my $relpy_topics = $self->{Addr_to_topics}->{$addr} ||= [];
438 0 0         return if grep { $_ eq $topic } @$relpy_topics;
  0            
439 0           push @$relpy_topics, $topic;
440              
441 0   0       my $caller_sessions = $self->{Addr_to_sessions}->{$addr} ||= [];
442 0           push @$caller_sessions, $caller_id;
443             }
444             elsif (defined $old_value) {
445             # Unbind
446 0           my $addr = $old_value->[0];
447 0           my $topic = $old_value->[1];
448              
449 0 0         return unless defined $addr;
450              
451 0   0       my $relpy_topics = $self->{Addr_to_topics}->{$addr} || return;
452 0           @$relpy_topics = grep { $_ ne $topic } @$relpy_topics;
  0            
453 0 0         delete $self->{Addr_to_topics}->{$addr} unless @$relpy_topics;
454              
455 0           my $caller_sessions = $self->{Addr_to_sessions}->{$addr};
456 0           @$caller_sessions = grep { $_ ne $caller_id } @$caller_sessions;
  0            
457 0 0         delete $self->{Addr_to_sessions}->{$addr} unless @$caller_sessions;
458             }
459             },
460 0           );
461              
462 0           $self->accept_remote_calls(
463             '_bkpr.router.bind' => 'bind_remote_session',
464             '_bkpr.router.unbind' => 'unbind_remote_session',
465             );
466             }
467              
468             sub bind_remote_session {
469 0     0 0   my ($self, $params) = @_;
470              
471 0           my $address = $params->{address};
472 0           my $caller_id = $params->{caller_id};
473 0           my $caller_addr = $params->{caller_addr};
474 0           my $auth_data = $params->{auth_data};
475              
476 0 0 0       unless (defined $caller_id && $caller_id =~ m/^\w{16,}$/) {
477             # eg: 7nXDsxMDwgLUSedX
478 0 0         die ( $caller_id ? "Invalid caller_id $caller_id" : "caller_id not specified");
479             }
480              
481 0 0 0       unless (defined $caller_addr && $caller_addr =~ m!^priv/\w+\@[\w-]+$!) {
482             # eg: priv/7nXDsxMDwgLUSedX@frontend-1
483 0 0         die ( $caller_id ? "Invalid caller_addr $caller_addr" : "caller_addr not specified");
484             }
485              
486 0 0         if (defined $address) {
487              
488 0           my $frontend_role = $self->{frontend_role};
489              
490 0 0         unless ($address =~ m/^[\w-]+\.[\w-]+$/) {
491             # eg: frontend.user-1234
492 0           die ( "Invalid address $address" );
493             }
494              
495 0 0         unless ($address =~ m/^$frontend_role\./) {
496             # eg: frontend.user-1234
497 0           die ( "Invalid address $address: router can handle only $frontend_role.* namespace" );
498             }
499              
500 0           $address =~ s/^$frontend_role\.//;
501             }
502              
503 0           $self->{MqttSessions}->set( $caller_id => [ $address, $caller_addr, $auth_data ] );
504              
505 0           return 1;
506             }
507              
508             sub unbind_remote_session {
509 0     0 0   my ($self, $params) = @_;
510              
511 0           my $caller_id = $params->{caller_id};
512 0           my $address = $params->{address};
513              
514 0           my $frontend_role = $self->{frontend_role};
515              
516 0 0 0       if (defined $caller_id && $caller_id !~ m/^\w{16,}$/) {
517             # eg: 7nXDsxMDwgLUSedX
518 0           die "Invalid caller_id $caller_id";
519             }
520              
521 0 0 0       if (defined $address && $address !~ m/^$frontend_role\.[\w-]+$/) {
522             # eg: @frontend.user-1234
523 0           die "Invalid address $address";
524             }
525              
526 0 0 0       unless ($caller_id || $address) {
527 0           die "No caller_id nor address were specified";
528             }
529              
530 0 0         if ($caller_id) {
531             # Remove single session
532 0           $self->{MqttSessions}->delete( $caller_id );
533             }
534              
535 0 0         if ($address) {
536              
537 0           $address =~ s/^$frontend_role\.//;
538              
539 0           my $sessions = $self->{Addr_to_sessions}->{$address};
540              
541             # Make a copy because @$sessions shortens on each delete
542 0 0         my @sessions = $sessions ? @$sessions : ();
543              
544             # Remove all sessions binded to address
545 0           foreach my $caller_id (@sessions) {
546 0           $self->{MqttSessions}->delete( $caller_id );
547             }
548             }
549              
550 0           return 1;
551             }
552              
553             1;
554              
555             __END__