File Coverage

lib/Beekeeper/Service/Router/Worker.pm
Criterion Covered Total %
statement 33 255 12.9
branch 0 64 0.0
condition 0 40 0.0
subroutine 11 36 30.5
pod 3 9 33.3
total 47 404 11.6


line stmt bran cond sub pod time code
1             package Beekeeper::Service::Router::Worker;
2              
3 1     1   1067 use strict;
  1         2  
  1         32  
4 1     1   4 use warnings;
  1         2  
  1         45  
5              
6             our $VERSION = '0.08';
7              
8 1     1   6 use Beekeeper::Worker ':log';
  1         2  
  1         133  
9 1     1   7 use base 'Beekeeper::Worker';
  1         2  
  1         66  
10              
11 1     1   6 use Beekeeper::Worker::Extension::SharedCache;
  1         2  
  1         73  
12 1     1   8 use Scalar::Util 'weaken';
  1         1  
  1         47  
13              
14 1     1   6 use constant FRONTEND_ROLE =>'frontend';
  1         3  
  1         78  
15 1     1   7 use constant SESSION_TIMEOUT => 1800;
  1         3  
  1         59  
16 1     1   7 use constant SHUTDOWN_WAIT => 2;
  1         2  
  1         44  
17 1     1   5 use constant QUEUE_LANES => 2;
  1         2  
  1         53  
18 1     1   7 use constant DEBUG => 0;
  1         1  
  1         3667  
19              
20             $Beekeeper::Worker::LogLevel = 9 if DEBUG;
21              
22              
23             sub authorize_request {
24 0     0 1   my ($self, $req) = @_;
25              
26 0 0         return unless $self->__has_authorization_token('BKPR_ROUTER');
27              
28 0           return BKPR_REQUEST_AUTHORIZED;
29             }
30              
31             sub on_startup {
32 0     0 1   my $self = shift;
33              
34 0           my $worker_config = $self->{_WORKER}->{config};
35 0           my $bus_config = $self->{_WORKER}->{bus_config};
36              
37 0   0       $self->{sess_timeout} = $worker_config->{'session_timeout'} || SESSION_TIMEOUT;
38 0   0       $self->{shutdown_wait} = $worker_config->{'shutdown_wait'} || SHUTDOWN_WAIT;
39 0   0       $self->{frontend_role} = $worker_config->{'frontend_role'} || FRONTEND_ROLE;
40              
41 0           $self->_init_routing_table;
42              
43 0           my $frontend_role = $self->{frontend_role};
44 0           my $frontends_config = Beekeeper::Config->get_bus_group_config( bus_role => $frontend_role );
45              
46 0 0         unless (@$frontends_config) {
47 0           die "No bus with role '$frontend_role' was found into config file bus.config.json\n";
48             }
49              
50 0           $self->{wait_frontends_up} = AnyEvent->condvar;
51              
52             # Create a connection to every frontend
53 0           foreach my $config (@$frontends_config) {
54              
55 0           $self->init_frontend_connection( $config );
56             }
57             }
58              
59             sub init_frontend_connection {
60 0     0 0   my ($self, $config) = @_;
61              
62 0           my $bus_id = $config->{'bus_id'};
63 0           my $back_id = $self->{_BUS}->bus_id;
64              
65 0           $self->{wait_frontends_up}->begin;
66              
67 0           my $bus; $bus = Beekeeper::MQTT->new(
68             %$config,
69             bus_id => $bus_id,
70             timeout => 60,
71             on_error => sub {
72             # Reconnect
73 0   0 0     my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
  0            
74 0           log_alert "Connection to $bus_id failed: $errmsg";
75 0           delete $self->{FRONTEND}->{$bus_id};
76 0           $self->{wait_frontends_up}->end;
77 0           my $delay = $self->{connect_err}->{$bus_id}++;
78             $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
79             after => ($delay < 10 ? $delay * 3 : 30),
80 0           cb => sub { $bus->connect },
81 0 0         );
82             },
83 0           );
84              
85             $bus->connect(
86             on_connack => sub {
87             # Setup routing
88 0     0     log_info "Routing: $back_id <--> $bus_id";
89 0           $self->{FRONTEND}->{$bus_id} = $bus;
90 0           $self->{wait_frontends_up}->end;
91 0           $self->pull_frontend_requests( frontend => $bus );
92 0           $self->pull_backend_responses( frontend => $bus );
93 0           $self->pull_backend_notifications( frontend => $bus );
94             },
95 0           );
96             }
97              
98             sub on_shutdown {
99 0     0 1   my ($self, %args) = @_;
100              
101 0           log_info "Shutting down";
102              
103 0           my $frontend_role = $self->{frontend_role};
104              
105 0           my $backend_bus = $self->{_BUS};
106 0           my $backend_role = $self->{_BUS}->{bus_role};
107              
108 0           my $cv = AnyEvent->condvar;
109              
110             # 1. Do not pull frontend requests anymore
111 0           foreach my $frontend_bus (values %{$self->{FRONTEND}}) {
  0            
112              
113 0           foreach my $lane (1..QUEUE_LANES) {
114              
115 0           my $topic = "\$share/BKPR/req/$backend_role-$lane";
116 0           $cv->begin;
117             $frontend_bus->unsubscribe(
118             topic => $topic,
119             on_unsuback => sub {
120 0     0     my ($success, $prop) = @_;
121 0 0         log_error "Could not unsubscribe from $topic" unless $success;
122 0           $cv->end;
123             }
124 0           );
125             }
126             }
127              
128             # 2. Stop forwarding notifications to frontend
129 0           foreach my $lane (1..QUEUE_LANES) {
130              
131 0           my $topic = "\$share/BKPR/msg/$frontend_role-$lane";
132 0           $cv->begin;
133             $backend_bus->unsubscribe(
134             topic => $topic,
135             on_unsuback => sub {
136 0     0     my ($success, $prop) = @_;
137 0 0         log_error "Could not unsubscribe from $topic" unless $success;
138 0           $cv->end;
139             }
140 0           );
141             }
142              
143             # 3. Wait for unsubacks, assuring that no more requests or messages are buffered
144 0     0     my $tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
  0            
145 0           $cv->recv;
146              
147             # 4. Just in case of pool full stop, wait for workers to finish their current tasks
148 0           my $wait = AnyEvent->condvar;
149 0     0     $tmr = AnyEvent->timer( after => $self->{shutdown_wait}, cb => sub { $wait->send });
  0            
150 0           $wait->recv;
151              
152 0           $cv = AnyEvent->condvar;
153              
154             # 5. Stop forwarding responses to frontend
155 0           foreach my $frontend_bus (values %{$self->{FRONTEND}}) {
  0            
156              
157 0           my $frontend_id = $frontend_bus->bus_id;
158              
159 0           foreach my $lane (1..QUEUE_LANES) {
160              
161 0           my $topic = "\$share/BKPR/res/$frontend_id-$lane";
162 0           $cv->begin;
163             $backend_bus->unsubscribe(
164             topic => $topic,
165             on_unsuback => sub {
166 0     0     my ($success, $prop) = @_;
167 0 0         log_error "Could not unsubscribe from $topic" unless $success;
168 0           $cv->end;
169             }
170 0           );
171             }
172             }
173              
174             # 6. Wait for unsubacks, assuring that no more responses are buffered
175 0     0     $tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
  0            
176 0           $cv->recv;
177              
178             # Disconnect from all frontends
179 0           my @frontends = values %{$self->{FRONTEND}};
  0            
180 0           foreach my $frontend_bus (@frontends) {
181              
182 0 0         next unless ($frontend_bus->{is_connected});
183 0           $frontend_bus->disconnect;
184             }
185              
186             # Disconnect shared cache
187 0           undef $self->{MqttSessions};
188              
189 0           log_info "Stopped";
190             }
191              
192             sub pull_frontend_requests {
193 0     0 0   my ($self, %args) = @_;
194 0           weaken($self);
195              
196             # Get requests from frontend bus and forward them to backend bus
197             #
198             # from: req/backend-n @frontend
199             # to: req/backend/{app}/{service} @backend
200              
201 0           my $frontend_bus = $args{frontend};
202 0           my $frontend_id = $frontend_bus->bus_id;
203              
204 0           my $backend_bus = $self->{_BUS};
205 0           my $backend_id = $backend_bus->bus_id;
206 0           my $backend_role = $backend_bus->bus_role;
207              
208 0           foreach my $lane (1..QUEUE_LANES) {
209              
210 0           my $src_queue = "\$share/BKPR/req/$backend_role-$lane";
211              
212 0           my ($payload_ref, $mqtt_properties);
213 0           my ($dest_queue, $reply_to, $caller_id, $mqtt_session);
214 0           my %pub_args;
215              
216             $frontend_bus->subscribe(
217             topic => $src_queue,
218             maximum_qos => 0,
219             on_publish => sub {
220 0     0     ($payload_ref, $mqtt_properties) = @_;
221              
222             # (!) UNTRUSTED REQUEST
223              
224             # eg: req/backend/myapp/service
225 0   0       $dest_queue = $mqtt_properties->{'fwd_to'} || '';
226 0 0         return unless $dest_queue =~ m|^req(?:/(?!_)[\w-]+)+$|;
227              
228             # eg: priv/7nXDsxMDwgLUSedX
229 0   0       $reply_to = $mqtt_properties->{'response_topic'} || '';
230 0 0         return unless $reply_to =~ m|^priv/(\w{16,23})$|;
231 0           $caller_id = $1;
232              
233             #TODO: Extra sanity checks could be done here before forwarding to backend
234              
235 0           %pub_args = (
236             topic => $dest_queue,
237             clid => $caller_id,
238             response_topic => "res/$frontend_id-$lane",
239             addr => "$reply_to\@$frontend_id",
240             payload => $payload_ref,
241             qos => 1, # because workers consume using QoS 1
242             );
243              
244 0           $mqtt_session = $self->{MqttSessions}->get( $caller_id );
245              
246 0 0         if (defined $mqtt_session) {
247 0           $self->{MqttSessions}->touch( $caller_id );
248 0           $pub_args{'auth'} = $mqtt_session->[2];
249             }
250              
251 0           $backend_bus->publish( %pub_args );
252              
253 0           DEBUG && log_trace "Forwarded request: $src_queue \@$frontend_id --> $dest_queue \@$backend_id";
254              
255 0           $self->{_WORKER}->{call_count}++;
256             },
257             on_suback => sub {
258 0     0     log_debug "Forwarding $src_queue \@$frontend_id --> req/$backend_role/{app}/{service} \@$backend_id";
259             }
260 0           );
261             }
262             }
263              
264             sub pull_backend_responses {
265 0     0 0   my ($self, %args) = @_;
266              
267             # Get responses from backend bus and forward them to frontend bus
268             #
269             # from: res/frontend-n @backend
270             # to: priv/{session_id} @frontend
271              
272 0           my $frontend_bus = $args{frontend};
273 0           my $frontend_id = $frontend_bus->bus_id;
274              
275 0           my $backend_bus = $self->{_BUS};
276 0           my $backend_id = $backend_bus->bus_id;
277              
278 0           foreach my $lane (1..QUEUE_LANES) {
279              
280 0           my $src_queue = "\$share/BKPR/res/$frontend_id-$lane";
281              
282 0           my ($payload_ref, $mqtt_properties, $dest_queue);
283              
284             $backend_bus->subscribe(
285             topic => $src_queue,
286             maximum_qos => 0,
287             on_publish => sub {
288 0     0     ($payload_ref, $mqtt_properties) = @_;
289              
290 0           ($dest_queue) = split('@', $mqtt_properties->{'addr'}, 2);
291              
292 0           $frontend_bus->publish(
293             topic => $dest_queue,
294             payload => $payload_ref,
295             );
296              
297 0           DEBUG && log_trace "Forwarded response: $src_queue \@$backend_id --> $dest_queue \@$frontend_id";
298             },
299             on_suback => sub {
300 0     0     log_debug "Forwarding $src_queue \@$backend_id --> priv/{session_id} \@$frontend_id";
301             }
302 0           );
303             }
304             }
305              
306             sub pull_backend_notifications {
307 0     0 0   my ($self, %args) = @_;
308 0           weaken($self);
309              
310             # Get notifications from backend bus and broadcast them to all frontend buses
311             #
312             # from: msg/frontend-n @backend
313             # to: msg/frontend/{app}/{service}/{method} @frontend
314              
315 0 0 0       unless (keys %{$self->{FRONTEND}} && $self->{wait_frontends_up}->ready) {
  0            
316             # Wait until connected to all (working) frontends before pulling
317             # notifications otherwise messages cannot be broadcasted properly
318             #TODO: MQTT: broker will discard messages unless someone subscribes
319 0           return;
320             }
321              
322 0           my $frontend_bus = $args{frontend};
323 0           my $frontend_id = $frontend_bus->bus_id;
324              
325 0           my $backend_bus = $self->{_BUS};
326 0           my $backend_id = $backend_bus->bus_id;
327              
328 0           my $frontend_role = $self->{frontend_role};
329              
330 0           foreach my $lane (1..QUEUE_LANES) {
331              
332 0           my $src_queue = "\$share/BKPR/msg/$frontend_role-$lane",
333              
334             my ($payload_ref, $mqtt_properties, $destination, $address);
335              
336             $backend_bus->subscribe(
337             topic => $src_queue,
338             maximum_qos => 0,
339             on_publish => sub {
340 0     0     ($payload_ref, $mqtt_properties) = @_;
341              
342 0           ($destination, $address) = split('@', $mqtt_properties->{'fwd_to'}, 2);
343              
344 0 0         if (defined $address) {
345              
346             # Unicast
347 0   0       my $dest_queues = $self->{Addr_to_topics}->{$address} || return;
348              
349 0           foreach my $queue (@$dest_queues) {
350              
351 0           my ($destination, $bus_id) = split('@', $queue, 2);
352              
353 0   0       my $frontend_bus = $self->{FRONTEND}->{$bus_id} || next;
354              
355 0           $frontend_bus->publish(
356             topic => $destination,
357             payload => $payload_ref,
358             );
359              
360 0           DEBUG && log_trace "Forwarded notific: $src_queue \@$backend_id --> $destination \@$frontend_id";
361             }
362             }
363             else {
364              
365             # Broadcast
366 0           foreach my $frontend_bus (values %{$self->{FRONTEND}}) {
  0            
367              
368 0           $frontend_bus->publish(
369             topic => $destination,
370             payload => $payload_ref,
371             );
372              
373 0           DEBUG && log_trace "Forwarded notific: $src_queue \@$backend_id --> $destination \@$frontend_id";
374             }
375             }
376              
377 0           $self->{_WORKER}->{notif_count}++;
378             },
379             on_suback => sub {
380 0     0     log_debug "Forwarding $src_queue \@$backend_id --> msg/frontend/{app}/{service}/{method} \@$frontend_id";
381             }
382 0           );
383             }
384             }
385              
386             sub _init_routing_table {
387 0     0     my $self = shift;
388              
389 0           $self->{Addr_to_topics} = {};
390 0           $self->{Addr_to_sessions} = {};
391              
392             $self->{MqttSessions} = $self->shared_cache(
393             id => "router",
394             persist => 1,
395             max_age => $self->{sess_timeout},
396             on_update => sub {
397 0     0     my ($caller_id, $value, $old_value) = @_;
398              
399             # Keep indexes: address -> [ caller_addr, ... ]
400             # address -> [ caller_id, ... ]
401              
402 0 0         if (defined $value) {
    0          
403             # Bind
404 0           my $addr = $value->[0];
405 0           my $topic = $value->[1];
406              
407 0 0         return unless defined $addr;
408              
409 0   0       my $relpy_topics = $self->{Addr_to_topics}->{$addr} ||= [];
410 0 0         return if grep { $_ eq $topic } @$relpy_topics;
  0            
411 0           push @$relpy_topics, $topic;
412              
413 0   0       my $caller_sessions = $self->{Addr_to_sessions}->{$addr} ||= [];
414 0           push @$caller_sessions, $caller_id;
415             }
416             elsif (defined $old_value) {
417             # Unbind
418 0           my $addr = $old_value->[0];
419 0           my $topic = $old_value->[1];
420              
421 0 0         return unless defined $addr;
422              
423 0   0       my $relpy_topics = $self->{Addr_to_topics}->{$addr} || return;
424 0           @$relpy_topics = grep { $_ ne $topic } @$relpy_topics;
  0            
425 0 0         delete $self->{Addr_to_topics}->{$addr} unless @$relpy_topics;
426              
427 0           my $caller_sessions = $self->{Addr_to_sessions}->{$addr};
428 0           @$caller_sessions = grep { $_ ne $caller_id } @$caller_sessions;
  0            
429 0 0         delete $self->{Addr_to_sessions}->{$addr} unless @$caller_sessions;
430             }
431             },
432 0           );
433              
434 0           $self->accept_remote_calls(
435             '_bkpr.router.bind' => 'bind_remote_session',
436             '_bkpr.router.unbind' => 'unbind_remote_session',
437             );
438             }
439              
440             sub bind_remote_session {
441 0     0 0   my ($self, $params) = @_;
442              
443 0           my $address = $params->{address};
444 0           my $caller_id = $params->{caller_id};
445 0           my $caller_addr = $params->{caller_addr};
446 0           my $auth_data = $params->{auth_data};
447              
448 0 0 0       unless (defined $caller_id && $caller_id =~ m/^\w{16,}$/) {
449             # eg: 7nXDsxMDwgLUSedX
450 0 0         die ( $caller_id ? "Invalid caller_id $caller_id" : "caller_id not specified");
451             }
452              
453 0 0 0       unless (defined $caller_addr && $caller_addr =~ m!^priv/\w+\@[\w-]+$!) {
454             # eg: priv/7nXDsxMDwgLUSedX@frontend-1
455 0 0         die ( $caller_id ? "Invalid caller_addr $caller_addr" : "caller_addr not specified");
456             }
457              
458 0 0         if (defined $address) {
459              
460 0           my $frontend_role = $self->{frontend_role};
461              
462 0 0         unless ($address =~ m/^[\w-]+\.[\w-]+$/) {
463             # eg: frontend.user-1234
464 0           die ( "Invalid address $address" );
465             }
466              
467 0 0         unless ($address =~ m/^$frontend_role\./) {
468             # eg: frontend.user-1234
469 0           die ( "Invalid address $address: router can handle only $frontend_role.* namespace" );
470             }
471              
472 0           $address =~ s/^$frontend_role\.//;
473             }
474              
475 0           $self->{MqttSessions}->set( $caller_id => [ $address, $caller_addr, $auth_data ] );
476              
477 0           return 1;
478             }
479              
480             sub unbind_remote_session {
481 0     0 0   my ($self, $params) = @_;
482              
483 0           my $caller_id = $params->{caller_id};
484 0           my $address = $params->{address};
485              
486 0           my $frontend_role = $self->{frontend_role};
487              
488 0 0 0       if (defined $caller_id && $caller_id !~ m/^\w{16,}$/) {
489             # eg: 7nXDsxMDwgLUSedX
490 0           die "Invalid caller_id $caller_id";
491             }
492              
493 0 0 0       if (defined $address && $address !~ m/^$frontend_role\.[\w-]+$/) {
494             # eg: @frontend.user-1234
495 0           die "Invalid address $address";
496             }
497              
498 0 0 0       unless ($caller_id || $address) {
499 0           die "No caller_id nor address were specified";
500             }
501              
502 0 0         if ($caller_id) {
503             # Remove single session
504 0           $self->{MqttSessions}->delete( $caller_id );
505             }
506              
507 0 0         if ($address) {
508              
509 0           $address =~ s/^$frontend_role\.//;
510              
511 0           my $sessions = $self->{Addr_to_sessions}->{$address};
512              
513             # Make a copy because @$sessions shortens on each delete
514 0 0         my @sessions = $sessions ? @$sessions : ();
515              
516             # Remove all sessions binded to address
517 0           foreach my $caller_id (@sessions) {
518 0           $self->{MqttSessions}->delete( $caller_id );
519             }
520             }
521              
522 0           return 1;
523             }
524              
525             1;
526              
527             __END__