File Coverage

lib/Beekeeper/Client.pm
Criterion Covered Total %
statement 195 289 67.4
branch 53 112 47.3
condition 14 44 31.8
subroutine 29 37 78.3
pod 10 11 90.9
total 301 493 61.0


line stmt bran cond sub pod time code
1             package Beekeeper::Client;
2              
3 11     11   962655 use strict;
  11         104  
  11         369  
4 11     11   59 use warnings;
  11         24  
  11         495  
5              
6             our $VERSION = '0.08';
7              
8 11     11   4764 use Beekeeper::AnyEvent;
  11         35  
  11         430  
9 11     11   7609 use Beekeeper::MQTT;
  11         40  
  11         703  
10 11     11   5603 use Beekeeper::JSONRPC;
  11         37  
  11         356  
11 11     11   4944 use Beekeeper::Config;
  11         34  
  11         387  
12              
13 11     11   77 use JSON::XS;
  11         23  
  11         540  
14 11     11   6071 use Sys::Hostname;
  11         12208  
  11         637  
15 11     11   83 use Time::HiRes;
  11         28  
  11         92  
16 11     11   1227 use Digest::MD5 'md5_base64';
  11         34  
  11         534  
17 11     11   69 use Carp;
  11         24  
  11         547  
18              
19 11     11   64 use constant QUEUE_LANES => 2;
  11         25  
  11         830  
20 11     11   90 use constant REQ_TIMEOUT => 60;
  11         23  
  11         693  
21              
22 11     11   75 use Exporter 'import';
  11         23  
  11         51599  
23              
24             our @EXPORT_OK = qw(
25             send_notification
26             call_remote
27             call_remote_async
28             fire_remote
29             wait_async_calls
30             get_authentication_data
31             set_authentication_data
32              
33             __do_rpc_request
34             __create_response_topic
35             __use_authorization_token
36             );
37              
38             our %EXPORT_TAGS = ('worker' => \@EXPORT_OK );
39              
40             our $singleton;
41              
42              
43             sub new {
44 6     6 0 84 my ($class, %args) = @_;
45              
46 6         150 my $self = {
47             _CLIENT => undef,
48             _BUS => undef,
49             };
50              
51             $self->{_CLIENT} = {
52 6         416 forward_to => undef,
53             response_topic => undef,
54             in_progress => undef,
55             curr_request => undef,
56             caller_id => undef,
57             caller_addr => undef,
58             auth_data => undef,
59             auth_salt => undef,
60             async_cv => undef,
61             correlation_id => 1,
62             callbacks => {},
63             };
64              
65 6 50 33     257 unless (exists $args{'username'} && exists $args{'password'}) {
66              
67             # Get broker connection parameters from config file
68              
69 6         56 my $bus_id = $args{'bus_id'};
70              
71 6 50       96 if (defined $bus_id) {
72             # Use parameters for specific bus
73 0         0 my $config = Beekeeper::Config->get_bus_config( bus_id => $bus_id );
74 0 0       0 croak "Bus '$bus_id' is not defined into config file bus.config.json" unless $config;
75 0         0 %args = ( %$config, %args );
76             }
77             else {
78 6         231 my $config = Beekeeper::Config->get_bus_config( bus_id => '*');
79 6 50       99 if (scalar(keys %$config) == 1) {
80             # Use the only config present
81 6         46 ($bus_id) = (keys %$config);
82 6         19 %args = ( %{$config->{$bus_id}}, bus_id => $bus_id, %args );
  6         110  
83             }
84             else {
85             # Use default parameters (if any)
86 0         0 my ($default) = grep { $config->{$_}->{default} } keys %$config;
  0         0  
87 0 0       0 croak "No default bus defined into config file bus.config.json" unless $default;
88 0         0 $bus_id = $config->{$default}->{'bus_id'};
89 0         0 %args = ( %{$config->{$default}}, bus_id => $bus_id, %args );
  0         0  
90             }
91             }
92             }
93              
94 6         36 $self->{_CLIENT}->{forward_to} = delete $args{'forward_to'};
95 6   33     119 $self->{_CLIENT}->{auth_salt} = delete $args{'auth_salt'} || $args{'bus_id'};
96              
97             # Start a fresh new MQTT session on connect
98 6         69 $args{'clean_start'} = 1;
99              
100             # Make the MQTT session ends when the connection is closed
101 6         49 $args{'session_expiry_interval'} = 0;
102              
103             # Keep only 1 unacked message (of QoS 1) in flight
104 6         37 $args{'receive_maximum'} = 1;
105              
106             # Do not use topic aliases
107 6         30 $args{'topic_alias_maximum'} = 0;
108              
109              
110 6         245 $self->{_BUS} = Beekeeper::MQTT->new( %args );
111              
112             # Connect to MQTT broker
113 6         94 $self->{_BUS}->connect( blocking => 1 );
114              
115 6         82 bless $self, $class;
116 6         60 return $self;
117             }
118              
119             sub instance {
120 31     31 1 4586365 my $class = shift;
121              
122 31 100       422 if ($singleton) {
123             # Return existing singleton
124 25         313 return $singleton;
125             }
126              
127             # Create a new instance
128 6         168 my $self = $class->new( @_ );
129              
130             # Keep a global reference to $self
131 6         16 $singleton = $self;
132              
133 6         33 return $self;
134             }
135              
136              
137             sub send_notification {
138 5     5 1 766713 my ($self, %args) = @_;
139              
140 5 50       46 my $fq_meth = $args{'method'} or croak "Method was not specified";
141              
142 5 50       28 $fq_meth .= '@' . $args{'address'} if (defined $args{'address'});
143              
144 5 50       101 $fq_meth =~ m/^ ( [\w-]+ (?:\.[\w-]+)* )
145             \. ( [\w-]+ )
146             (?: \@ ( [\w-]+ ) (\.[\w-]+)* )? $/x or croak "Invalid method '$fq_meth'";
147              
148 5         47 my ($service, $method, $remote_bus, $addr) = ($1, $2, $3, $4);
149              
150             my $json = encode_json({
151             jsonrpc => '2.0',
152             method => "$service.$method",
153 5         89 params => $args{'params'},
154             });
155              
156 5         30 my %send_args;
157              
158 5         22 my $local_bus = $self->{_BUS}->{bus_role};
159              
160 5 50       126 $remote_bus = $self->{_CLIENT}->{forward_to} unless (defined $remote_bus);
161              
162 5 50       35 if (defined $remote_bus) {
163              
164 0         0 $send_args{'topic'} = "msg/$remote_bus-" . int( rand(QUEUE_LANES) + 1 );
165 0         0 $send_args{'topic'} =~ tr|.|/|;
166              
167 0         0 $send_args{'fwd_to'} = "msg/$remote_bus/$service/$method";
168 0 0 0     0 $send_args{'fwd_to'} .= "\@$addr" if (defined $addr && $addr =~ s/^\.//);
169 0         0 $send_args{'fwd_to'} =~ tr|.|/|;
170             }
171             else {
172 5         43 $send_args{'topic'} = "msg/$local_bus/$service/$method";
173 5         27 $send_args{'topic'} =~ tr|.|/|;
174             }
175              
176 5 50       22 $send_args{'auth'} = $self->{_CLIENT}->{auth_data} if defined $self->{_CLIENT}->{auth_data};
177 5 50       28 $send_args{'clid'} = $self->{_CLIENT}->{caller_id} if defined $self->{_CLIENT}->{caller_id};
178              
179 5 50       16 if (exists $args{'buffer_id'}) {
180 0         0 $send_args{'buffer_id'} = $args{'buffer_id'};
181             }
182              
183 5         46 $self->{_BUS}->publish( payload => \$json, %send_args );
184             }
185              
186              
187             sub accept_notifications {
188 0     0 1 0 my ($self, %args) = @_;
189              
190 0         0 my ($file, $line) = (caller)[1,2];
191 0         0 my $at = "at $file line $line\n";
192              
193 0         0 my $callbacks = $self->{_CLIENT}->{callbacks};
194              
195 0         0 foreach my $fq_meth (keys %args) {
196              
197 0 0       0 $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
198             \. ( [\w-]+ | \* ) $/x or croak "Invalid notification method '$fq_meth'";
199              
200 0         0 my ($service, $method) = ($1, $2);
201              
202 0         0 my $callback = $args{$fq_meth};
203              
204 0 0       0 unless (ref $callback eq 'CODE') {
205 0         0 croak "Invalid callback for '$fq_meth'";
206             }
207              
208 0 0       0 croak "Already accepting notifications '$fq_meth'" if exists $callbacks->{"msg.$fq_meth"};
209 0         0 $callbacks->{"msg.$fq_meth"} = $callback;
210              
211             #TODO: Allow to accept private notifications without subscribing
212              
213 0         0 my $local_bus = $self->{_BUS}->{bus_role};
214              
215 0         0 my $topic = "msg/$local_bus/$service/$method";
216 0         0 $topic =~ tr|.*|/#|;
217              
218             $self->{_BUS}->subscribe(
219             topic => $topic,
220             on_publish => sub {
221 0     0   0 my ($payload_ref, $mqtt_properties) = @_;
222              
223 0         0 local $@;
224 0         0 my $request = eval { decode_json($$payload_ref) };
  0         0  
225              
226 0 0 0     0 unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
227 0         0 warn "Received invalid JSON-RPC 2.0 notification $at";
228 0         0 return;
229             }
230              
231 0         0 bless $request, 'Beekeeper::JSONRPC::Notification';
232 0         0 $request->{_mqtt_properties} = $mqtt_properties;
233              
234 0         0 my $method = $request->{method};
235              
236 0 0 0     0 unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
237 0         0 warn "Received notification with invalid method '$method' $at";
238 0         0 return;
239             }
240              
241             my $cb = $callbacks->{"msg.$1.$2"} ||
242 0   0     0 $callbacks->{"msg.$1.*"};
243              
244 0 0       0 unless ($cb) {
245 0         0 warn "No callback found for received notification '$method' $at";
246 0         0 return;
247             }
248              
249 0         0 $cb->($request->{params}, $request);
250             },
251             on_suback => sub {
252 0     0   0 my ($success, $prop) = @_;
253 0 0       0 die "Could not subscribe to topic '$topic' $at" unless $success;
254             }
255 0         0 );
256             }
257             }
258              
259              
260             sub stop_accepting_notifications {
261 0     0 1 0 my ($self, @methods) = @_;
262              
263 0         0 my ($file, $line) = (caller)[1,2];
264 0         0 my $at = "at $file line $line\n";
265              
266 0 0       0 croak "No method specified" unless @methods;
267              
268 0         0 foreach my $fq_meth (@methods) {
269              
270 0 0       0 $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
271             \. ( [\w-]+ | \* ) $/x or croak "Invalid method '$fq_meth'";
272              
273 0         0 my ($service, $method) = ($1, $2);
274              
275 0 0       0 unless (defined $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"}) {
276 0         0 carp "Not previously accepting notifications '$fq_meth'";
277 0         0 next;
278             }
279              
280 0         0 my $local_bus = $self->{_BUS}->{bus_role};
281              
282 0         0 my $topic = "msg/$local_bus/$service/$method";
283 0         0 $topic =~ tr|.*|/#|;
284              
285             $self->{_BUS}->unsubscribe(
286             topic => $topic,
287             on_unsuback => sub {
288 0     0   0 my ($success, $prop) = @_;
289              
290 0 0       0 die "Could not unsubscribe from topic '$topic' $at" unless $success;
291              
292 0         0 delete $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"};
293             },
294 0         0 );
295             }
296             }
297              
298              
299             our $AE_WAITING;
300              
301             sub call_remote {
302 52     52 1 5142 my $self = shift;
303              
304 52         360 my $req = $self->__do_rpc_request( @_, req_type => 'SYNCHRONOUS' );
305              
306             # Make AnyEvent allow one level of recursive condvar blocking, as we may
307             # block both in $worker->__work_forever and in $client->__do_rpc_request
308 51 50       166 $AE_WAITING && Carp::confess "Recursive blocking call attempted: " .
309             "trying to make a call_remote while another call_remote is still in progress, " .
310             "but it is not possible to make two blocking calls simultaneously " .
311             "(tip: one of the two calls must be made with call_remote_async)";
312              
313 51         160 local $AE_WAITING = 1;
314 51         147 local $AnyEvent::CondVar::Base::WAITING = 0;
315              
316             # Block until a response is received or request timed out
317 51         315 $req->{_waiting_response}->recv;
318              
319 51         2141 my $resp = $req->{_response};
320              
321 51 100 100     328 if (!exists $resp->{result} && $req->{_raise_error}) {
322 5         27 my $errmsg = $resp->code . " " . $resp->message;
323 5         1141 croak "Call to '$req->{method}' failed: $errmsg";
324             }
325              
326 46         560 return $resp;
327             }
328              
329             sub call_remote_async {
330 13     13 1 731 my $self = shift;
331              
332 13         38 my $req = $self->__do_rpc_request( @_, req_type => 'ASYNCHRONOUS' );
333              
334 13         38 return $req;
335             }
336              
337             sub fire_remote {
338 3     3 1 82 my $self = shift;
339              
340             # Send request to a worker, but do not wait for response
341 3         11 $self->__do_rpc_request( @_, req_type => 'FIRE_FORGET' );
342              
343 3         12 return;
344             }
345              
346             my $__now = 0;
347              
348             sub __do_rpc_request {
349 68     68   874 my ($self, %args) = @_;
350 68         259 my $client = $self->{_CLIENT};
351              
352 68 50       279 my $fq_meth = $args{'method'} or croak "Method was not specified";
353              
354 68 50       227 $fq_meth .= '@' . $args{'address'} if (defined $args{'address'});
355              
356 68 100       1335 $fq_meth =~ m/^ ( [\w-]+ (?:\.[\w-]+)* )
357             \. ( [\w-]+ )
358             (?: \@ ( [\w-]+ ) (\.[\w-]+)* )? $/x or croak "Invalid method '$fq_meth'";
359              
360 67         809 my ($service, $method, $remote_bus, $addr) = ($1, $2, $3, $4);
361              
362 67         142 my %send_args;
363              
364 67         220 my $local_bus = $self->{_BUS}->{bus_role};
365              
366 67 50       247 $remote_bus = $client->{forward_to} unless (defined $remote_bus);
367              
368             # Local bus request sent to: req/{local_bus}/{service_class}
369             # Remote bus request sent to: req/{remote_bus}
370              
371 67 50       232 if (defined $remote_bus) {
372              
373 0         0 $send_args{'topic'} = "req/$remote_bus-" . int( rand(QUEUE_LANES) + 1 );
374 0         0 $send_args{'topic'} =~ tr|.|/|;
375              
376 0         0 $send_args{'fwd_to'} = "req/$remote_bus/$service";
377 0 0 0     0 $send_args{'fwd_to'} .= "\@$addr" if (defined $addr && $addr =~ s/^\.//);
378 0         0 $send_args{'fwd_to'} =~ tr|.|/|;
379             }
380             else {
381 67         393 $send_args{'topic'} = "req/$local_bus/$service";
382 67         256 $send_args{'topic'} =~ tr|.|/|;
383             }
384              
385 67 100       330 $send_args{'auth'} = $client->{auth_data} if defined $client->{auth_data};
386 67 50       257 $send_args{'clid'} = $client->{caller_id} if defined $client->{caller_id};
387              
388 67         232 my $FIRE_FORGET = $args{req_type} eq 'FIRE_FORGET';
389 67         208 my $SYNCHRONOUS = $args{req_type} eq 'SYNCHRONOUS';
390 67         152 my $raise_error = $args{'raise_error'};
391 67         113 my $req_id;
392              
393             # JSON-RPC call
394             my $req = {
395             jsonrpc => '2.0',
396             method => "$service.$method",
397 67         525 params => $args{'params'},
398             };
399              
400             # Reuse or create a private topic which will receive responses
401             $send_args{'response_topic'} = $client->{response_topic} ||
402 67   66     376 $self->__create_response_topic;
403              
404 67 100       15722 unless ($FIRE_FORGET) {
405             # Assign an unique request id (unique only for this client)
406 64         213 $req_id = $client->{correlation_id}++;
407 64         245 $req->{'id'} = $req_id;
408             }
409              
410 67         813 my $json = encode_json($req);
411              
412 67 50       241 if (exists $args{'buffer_id'}) {
413 0         0 $send_args{'buffer_id'} = $args{'buffer_id'};
414             }
415              
416             # Send request
417             $self->{_BUS}->publish(
418 67         6027 payload => \$json,
419             qos => 1,
420             %send_args,
421             );
422              
423 67 100       297 if ($FIRE_FORGET) {
    100          
424             # Nothing else to do
425 3         18 return;
426             }
427             elsif ($SYNCHRONOUS) {
428              
429 51 100       249 $req->{_raise_error} = (defined $raise_error) ? $raise_error : 1;
430              
431             # Wait until a response is received in the reply queue
432 51         3051 $req->{_waiting_response} = AnyEvent->condvar;
433 51         841 $req->{_waiting_response}->begin;
434             }
435             else {
436              
437 13         31 $req->{_on_success_cb} = $args{'on_success'};
438 13         28 $req->{_on_error_cb} = $args{'on_error'};
439              
440 13 50 33     45 if ($raise_error && !$req->{_on_error_cb}) {
441             $req->{_on_error_cb} = sub {
442 0     0   0 my $errmsg = $_[0]->code . " " . $_[0]->message;
443 0         0 croak "Call to '$service.$method' failed: $errmsg";
444 0         0 };
445             }
446              
447             # Use shared cv for all requests
448 13 100 100     105 if (!$client->{async_cv} || $client->{async_cv}->ready) {
449 4         151 $client->{async_cv} = AnyEvent->condvar;
450             }
451              
452 13         90 $req->{_waiting_response} = $client->{async_cv};
453 13         42 $req->{_waiting_response}->begin;
454             }
455              
456 64         1024 $client->{in_progress}->{$req_id} = $req;
457              
458             # Ensure that timeout is set properly when the event loop was blocked
459 64 100       260 if ($__now != time) { $__now = time; AnyEvent->now_update }
  25         54  
  25         224  
460              
461             # Request timeout timer
462 64   100     844 my $timeout = $args{'timeout'} || REQ_TIMEOUT;
463             $req->{_timeout} = AnyEvent->timer( after => $timeout, cb => sub {
464 2     2   15776 my $req = delete $client->{in_progress}->{$req_id};
465 2         48 $req->{_response} = Beekeeper::JSONRPC::Error->request_timeout;
466 2 50       6 $req->{_on_error_cb}->($req->{_response}) if $req->{_on_error_cb};
467 2         16 $req->{_waiting_response}->end;
468 64         996 });
469              
470 64         2686 bless $req, 'Beekeeper::JSONRPC::Request';
471 64         476 return $req;
472             }
473              
474             sub __create_response_topic {
475 6     6   21 my $self = shift;
476 6         18 my $client = $self->{_CLIENT};
477              
478 6         125 my ($file, $line) = (caller(2))[1,2];
479 6         41 my $at = "at $file line $line\n";
480              
481             # Subscribe to an exclusive topic for receiving RPC responses
482              
483 6         26 my $response_topic = 'priv/' . $self->{_BUS}->{client_id};
484 6         16 $client->{response_topic} = $response_topic;
485              
486             $self->{_BUS}->subscribe(
487             topic => $response_topic,
488             maximum_qos => 0,
489             on_publish => sub {
490 64     64   203 my ($payload_ref, $mqtt_properties) = @_;
491              
492 64         189 local $@;
493 64         159 my $resp = eval { decode_json($$payload_ref) };
  64         1565  
494              
495 64 50 33     836 unless (ref $resp eq 'HASH' && $resp->{jsonrpc} eq '2.0') {
496 0         0 warn "Received invalid JSON-RPC 2.0 message $at";
497 0         0 return;
498             }
499              
500 64 50       233 if (exists $resp->{'id'}) {
501              
502             # Response of an RPC request
503              
504 64         192 my $req_id = $resp->{'id'};
505 64         333 my $req = delete $client->{in_progress}->{$req_id};
506              
507             # Ignore unexpected responses
508 64 100       233 return unless $req;
509              
510             # Cancel request timeout
511 62         605 delete $req->{_timeout};
512              
513 62 100       214 if (exists $resp->{'result'}) {
514             # Success response
515 57         509 $req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Response';
516 57 100       262 $req->{_on_success_cb}->($resp) if $req->{_on_success_cb};
517             }
518             else {
519             # Error response
520 5         42 $req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Error';
521 5 100       20 $req->{_on_error_cb}->($resp) if $req->{_on_error_cb};
522             }
523            
524 62         397 $req->{_waiting_response}->end;
525             }
526             else {
527              
528             # Unicasted notification
529              
530 0         0 bless $resp, 'Beekeeper::JSONRPC::Notification';
531 0         0 $resp->{_headers} = $mqtt_properties;
532              
533 0         0 my $method = $resp->{method};
534              
535 0 0 0     0 unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
536 0         0 warn "Received notification with invalid method '$method' $at";
537 0         0 return;
538             }
539              
540             my $cb = $client->{callbacks}->{"msg.$1.$2"} ||
541 0   0     0 $client->{callbacks}->{"msg.$1.*"};
542              
543 0 0       0 unless ($cb) {
544 0         0 warn "No callback found for received notification '$method' $at";
545 0         0 return;
546             }
547              
548 0         0 $cb->($resp->{params}, $resp);
549             }
550             },
551             on_suback => sub {
552 6     6   21 my ($success, $prop) = @_;
553 6 50       79 die "Could not subscribe to response topic '$response_topic' $at" unless $success;
554             }
555 6         312 );
556              
557 6         86 return $response_topic;
558             }
559              
560             sub wait_async_calls {
561 2     2 1 13 my ($self) = @_;
562              
563             # Wait for all pending async requests
564 2         21 my $cv = delete $self->{_CLIENT}->{async_cv};
565 2 50       9 return unless defined $cv;
566              
567             # Make AnyEvent to allow one level of recursive condvar blocking, as we may
568             # block both in $worker->__work_forever and here
569 2 50       8 $AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
570 2         6 local $AE_WAITING = 1;
571 2         4 local $AnyEvent::CondVar::Base::WAITING = 0;
572              
573 2         11 $cv->recv;
574             }
575              
576              
577             sub get_authentication_data {
578 0     0 1 0 my ($self) = @_;
579              
580 0         0 $self->{_CLIENT}->{auth_data};
581             }
582              
583             sub set_authentication_data {
584 0     0 1 0 my ($self, $data) = @_;
585              
586 0         0 $self->{_CLIENT}->{auth_data} = $data;
587             }
588              
589             sub __use_authorization_token {
590 18     18   166 my ($self, $token) = @_;
591              
592             # Using a hashing function makes harder to access the wrong worker pool by mistake,
593             # but it is not an effective access restriction: anyone with access to the backend
594             # bus credentials can easily inspect and clone auth data tokens
595              
596 18         262 my $salt = $self->{_CLIENT}->{auth_salt};
597              
598 18         97 my $adata_ref = \$self->{_CLIENT}->{auth_data};
599              
600 18         217 my $guard = Beekeeper::Client::Guard->new( $adata_ref );
601              
602 18         351 $$adata_ref = md5_base64($token . $salt);
603              
604 18         88 return $guard;
605             }
606              
607             1;
608              
609             package
610             Beekeeper::Client::Guard; # hide from PAUSE
611              
612             sub new {
613 18     18   99 my ($class, $ref) = @_;
614              
615 18         189 bless [$ref, $$ref], $class;
616             }
617              
618             sub DESTROY {
619              
620 18     18   77 ${$_[0]->[0]} = $_[0]->[1];
  18         175  
621             }
622              
623             1;
624              
625             __END__