File Coverage

lib/Beekeeper/Client.pm
Criterion Covered Total %
statement 195 289 67.4
branch 53 112 47.3
condition 14 44 31.8
subroutine 29 37 78.3
pod 10 11 90.9
total 301 493 61.0


line stmt bran cond sub pod time code
1             package Beekeeper::Client;
2              
3 11     11   961437 use strict;
  11         98  
  11         380  
4 11     11   63 use warnings;
  11         25  
  11         510  
5              
6             our $VERSION = '0.09';
7              
8 11     11   4912 use Beekeeper::AnyEvent;
  11         36  
  11         436  
9 11     11   7496 use Beekeeper::MQTT;
  11         43  
  11         672  
10 11     11   5348 use Beekeeper::JSONRPC;
  11         34  
  11         339  
11 11     11   4865 use Beekeeper::Config;
  11         36  
  11         377  
12              
13 11     11   79 use JSON::XS;
  11         21  
  11         555  
14 11     11   6125 use Sys::Hostname;
  11         12042  
  11         622  
15 11     11   89 use Time::HiRes;
  11         23  
  11         85  
16 11     11   1170 use Digest::MD5 'md5_base64';
  11         27  
  11         527  
17 11     11   65 use Carp;
  11         26  
  11         532  
18              
19 11     11   69 use constant QUEUE_LANES => 2;
  11         28  
  11         837  
20 11     11   90 use constant REQ_TIMEOUT => 60;
  11         25  
  11         626  
21              
22 11     11   73 use Exporter 'import';
  11         25  
  11         51400  
23              
24             our @EXPORT_OK = qw(
25             send_notification
26             call_remote
27             call_remote_async
28             fire_remote
29             wait_async_calls
30             get_authentication_data
31             set_authentication_data
32              
33             __do_rpc_request
34             __create_response_topic
35             __use_authorization_token
36             );
37              
38             our %EXPORT_TAGS = ('worker' => \@EXPORT_OK );
39              
40             our $singleton;
41              
42              
43             sub new {
44 6     6 0 93 my ($class, %args) = @_;
45              
46 6         215 my $self = {
47             _CLIENT => undef,
48             _BUS => undef,
49             };
50              
51             $self->{_CLIENT} = {
52 6         549 forward_to => undef,
53             response_topic => undef,
54             in_progress => undef,
55             curr_request => undef,
56             caller_id => undef,
57             caller_addr => undef,
58             auth_data => undef,
59             auth_salt => undef,
60             async_cv => undef,
61             correlation_id => 1,
62             callbacks => {},
63             };
64              
65 6 50 33     361 unless (exists $args{'username'} && exists $args{'password'}) {
66              
67             # Get broker connection parameters from config file
68              
69 6         64 my $bus_id = $args{'bus_id'};
70              
71 6 50       119 if (defined $bus_id) {
72             # Use parameters for specific bus
73 0         0 my $config = Beekeeper::Config->get_bus_config( bus_id => $bus_id );
74 0 0       0 croak "Bus '$bus_id' is not defined into config file bus.config.json" unless $config;
75 0         0 %args = ( %$config, %args );
76             }
77             else {
78 6         298 my $config = Beekeeper::Config->get_bus_config( bus_id => '*');
79 6 50       100 if (scalar(keys %$config) == 1) {
80             # Use the only config present
81 6         47 ($bus_id) = (keys %$config);
82 6         51 %args = ( %{$config->{$bus_id}}, bus_id => $bus_id, %args );
  6         134  
83             }
84             else {
85             # Use default parameters (if any)
86 0         0 my ($default) = grep { $config->{$_}->{default} } keys %$config;
  0         0  
87 0 0       0 croak "No default bus defined into config file bus.config.json" unless $default;
88 0         0 $bus_id = $config->{$default}->{'bus_id'};
89 0         0 %args = ( %{$config->{$default}}, bus_id => $bus_id, %args );
  0         0  
90             }
91             }
92             }
93              
94 6         59 $self->{_CLIENT}->{forward_to} = delete $args{'forward_to'};
95 6   33     128 $self->{_CLIENT}->{auth_salt} = delete $args{'auth_salt'} || $args{'bus_id'};
96              
97             # Start a fresh new MQTT session on connect
98 6         78 $args{'clean_start'} = 1;
99              
100             # Make the MQTT session ends when the connection is closed
101 6         69 $args{'session_expiry_interval'} = 0;
102              
103             # Keep only 1 unacked message (of QoS 1) in flight
104 6         51 $args{'receive_maximum'} = 1;
105              
106             # Do not use topic aliases
107 6         68 $args{'topic_alias_maximum'} = 0;
108              
109              
110 6         295 $self->{_BUS} = Beekeeper::MQTT->new( %args );
111              
112             # Connect to MQTT broker
113 6         113 $self->{_BUS}->connect( blocking => 1 );
114              
115 6         74 bless $self, $class;
116 6         57 return $self;
117             }
118              
119             sub instance {
120 31     31 1 4292163 my $class = shift;
121              
122 31 100       605 if ($singleton) {
123             # Return existing singleton
124 25         458 return $singleton;
125             }
126              
127             # Create a new instance
128 6         191 my $self = $class->new( @_ );
129              
130             # Keep a global reference to $self
131 6         17 $singleton = $self;
132              
133 6         28 return $self;
134             }
135              
136              
137             sub send_notification {
138 5     5 1 953147 my ($self, %args) = @_;
139              
140 5 50       44 my $fq_meth = $args{'method'} or croak "Method was not specified";
141              
142 5 50       38 $fq_meth .= '@' . $args{'address'} if (defined $args{'address'});
143              
144 5 50       104 $fq_meth =~ m/^ ( [\w-]+ (?:\.[\w-]+)* )
145             \. ( [\w-]+ )
146             (?: \@ ( [\w-]+ ) (\.[\w-]+)* )? $/x or croak "Invalid method '$fq_meth'";
147              
148 5         51 my ($service, $method, $remote_bus, $addr) = ($1, $2, $3, $4);
149              
150             my $json = encode_json({
151             jsonrpc => '2.0',
152             method => "$service.$method",
153 5         85 params => $args{'params'},
154             });
155              
156 5         19 my %send_args;
157              
158 5         26 my $local_bus = $self->{_BUS}->{bus_role};
159              
160 5 50       118 $remote_bus = $self->{_CLIENT}->{forward_to} unless (defined $remote_bus);
161              
162 5 50       28 if (defined $remote_bus) {
163              
164 0         0 $send_args{'topic'} = "msg/$remote_bus-" . int( rand(QUEUE_LANES) + 1 );
165 0         0 $send_args{'topic'} =~ tr|.|/|;
166              
167 0         0 $send_args{'fwd_to'} = "msg/$remote_bus/$service/$method";
168 0 0 0     0 $send_args{'fwd_to'} .= "\@$addr" if (defined $addr && $addr =~ s/^\.//);
169 0         0 $send_args{'fwd_to'} =~ tr|.|/|;
170             }
171             else {
172 5         43 $send_args{'topic'} = "msg/$local_bus/$service/$method";
173 5         27 $send_args{'topic'} =~ tr|.|/|;
174             }
175              
176 5 50       37 $send_args{'auth'} = $self->{_CLIENT}->{auth_data} if defined $self->{_CLIENT}->{auth_data};
177 5 50       33 $send_args{'clid'} = $self->{_CLIENT}->{caller_id} if defined $self->{_CLIENT}->{caller_id};
178              
179 5 50       25 if (exists $args{'buffer_id'}) {
180 0         0 $send_args{'buffer_id'} = $args{'buffer_id'};
181             }
182              
183 5         43 $self->{_BUS}->publish( payload => \$json, %send_args );
184             }
185              
186              
187             sub accept_notifications {
188 0     0 1 0 my ($self, %args) = @_;
189              
190 0         0 my ($file, $line) = (caller)[1,2];
191 0         0 my $at = "at $file line $line\n";
192              
193 0         0 my $callbacks = $self->{_CLIENT}->{callbacks};
194              
195 0         0 foreach my $fq_meth (keys %args) {
196              
197 0 0       0 $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
198             \. ( [\w-]+ | \* ) $/x or croak "Invalid notification method '$fq_meth'";
199              
200 0         0 my ($service, $method) = ($1, $2);
201              
202 0         0 my $callback = $args{$fq_meth};
203              
204 0 0       0 unless (ref $callback eq 'CODE') {
205 0         0 croak "Invalid callback for '$fq_meth'";
206             }
207              
208 0 0       0 croak "Already accepting notifications '$fq_meth'" if exists $callbacks->{"msg.$fq_meth"};
209 0         0 $callbacks->{"msg.$fq_meth"} = $callback;
210              
211             #TODO: Allow to accept private notifications without subscribing
212              
213 0         0 my $local_bus = $self->{_BUS}->{bus_role};
214              
215 0         0 my $topic = "msg/$local_bus/$service/$method";
216 0         0 $topic =~ tr|.*|/#|;
217              
218             $self->{_BUS}->subscribe(
219             topic => $topic,
220             on_publish => sub {
221 0     0   0 my ($payload_ref, $mqtt_properties) = @_;
222              
223 0         0 local $@;
224 0         0 my $request = eval { decode_json($$payload_ref) };
  0         0  
225              
226 0 0 0     0 unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
227 0         0 warn "Received invalid JSON-RPC 2.0 notification $at";
228 0         0 return;
229             }
230              
231 0         0 bless $request, 'Beekeeper::JSONRPC::Notification';
232 0         0 $request->{_mqtt_properties} = $mqtt_properties;
233              
234 0         0 my $method = $request->{method};
235              
236 0 0 0     0 unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
237 0         0 warn "Received notification with invalid method '$method' $at";
238 0         0 return;
239             }
240              
241             my $cb = $callbacks->{"msg.$1.$2"} ||
242 0   0     0 $callbacks->{"msg.$1.*"};
243              
244 0 0       0 unless ($cb) {
245 0         0 warn "No callback found for received notification '$method' $at";
246 0         0 return;
247             }
248              
249 0         0 $cb->($request->{params}, $request);
250             },
251             on_suback => sub {
252 0     0   0 my ($success, $prop) = @_;
253 0 0       0 die "Could not subscribe to topic '$topic' $at" unless $success;
254             }
255 0         0 );
256             }
257             }
258              
259              
260             sub stop_accepting_notifications {
261 0     0 1 0 my ($self, @methods) = @_;
262              
263 0         0 my ($file, $line) = (caller)[1,2];
264 0         0 my $at = "at $file line $line\n";
265              
266 0 0       0 croak "No method specified" unless @methods;
267              
268 0         0 foreach my $fq_meth (@methods) {
269              
270 0 0       0 $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
271             \. ( [\w-]+ | \* ) $/x or croak "Invalid method '$fq_meth'";
272              
273 0         0 my ($service, $method) = ($1, $2);
274              
275 0 0       0 unless (defined $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"}) {
276 0         0 carp "Not previously accepting notifications '$fq_meth'";
277 0         0 next;
278             }
279              
280 0         0 my $local_bus = $self->{_BUS}->{bus_role};
281              
282 0         0 my $topic = "msg/$local_bus/$service/$method";
283 0         0 $topic =~ tr|.*|/#|;
284              
285             $self->{_BUS}->unsubscribe(
286             topic => $topic,
287             on_unsuback => sub {
288 0     0   0 my ($success, $prop) = @_;
289              
290 0 0       0 die "Could not unsubscribe from topic '$topic' $at" unless $success;
291              
292 0         0 delete $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"};
293             },
294 0         0 );
295             }
296             }
297              
298              
299             our $AE_WAITING;
300              
301             sub call_remote {
302 52     52 1 5829 my $self = shift;
303              
304 52         412 my $req = $self->__do_rpc_request( @_, req_type => 'SYNCHRONOUS' );
305              
306             # Make AnyEvent allow one level of recursive condvar blocking, as we may
307             # block both in $worker->__work_forever and in $client->__do_rpc_request
308 51 50       215 $AE_WAITING && Carp::confess "Recursive blocking call attempted: " .
309             "trying to make a call_remote while another call_remote is still in progress, " .
310             "but it is not possible to make two blocking calls simultaneously " .
311             "(tip: one of the two calls must be made with call_remote_async)";
312              
313 51         170 local $AE_WAITING = 1;
314 51         154 local $AnyEvent::CondVar::Base::WAITING = 0;
315              
316             # Block until a response is received or request timed out
317 51         385 $req->{_waiting_response}->recv;
318              
319 51         2457 my $resp = $req->{_response};
320              
321 51 100 100     357 if (!exists $resp->{result} && $req->{_raise_error}) {
322 5         20 my $errmsg = $resp->code . " " . $resp->message;
323 5         1291 croak "Call to '$req->{method}' failed: $errmsg";
324             }
325              
326 46         629 return $resp;
327             }
328              
329             sub call_remote_async {
330 13     13 1 734 my $self = shift;
331              
332 13         36 my $req = $self->__do_rpc_request( @_, req_type => 'ASYNCHRONOUS' );
333              
334 13         67 return $req;
335             }
336              
337             sub fire_remote {
338 3     3 1 107 my $self = shift;
339              
340             # Send request to a worker, but do not wait for response
341 3         13 $self->__do_rpc_request( @_, req_type => 'FIRE_FORGET' );
342              
343 3         15 return;
344             }
345              
346             my $__now = 0;
347              
348             sub __do_rpc_request {
349 68     68   923 my ($self, %args) = @_;
350 68         255 my $client = $self->{_CLIENT};
351              
352 68 50       289 my $fq_meth = $args{'method'} or croak "Method was not specified";
353              
354 68 50       270 $fq_meth .= '@' . $args{'address'} if (defined $args{'address'});
355              
356 68 100       1481 $fq_meth =~ m/^ ( [\w-]+ (?:\.[\w-]+)* )
357             \. ( [\w-]+ )
358             (?: \@ ( [\w-]+ ) (\.[\w-]+)* )? $/x or croak "Invalid method '$fq_meth'";
359              
360 67         932 my ($service, $method, $remote_bus, $addr) = ($1, $2, $3, $4);
361              
362 67         188 my %send_args;
363              
364 67         255 my $local_bus = $self->{_BUS}->{bus_role};
365              
366 67 50       276 $remote_bus = $client->{forward_to} unless (defined $remote_bus);
367              
368             # Local bus request sent to: req/{local_bus}/{service_class}
369             # Remote bus request sent to: req/{remote_bus}
370              
371 67 50       222 if (defined $remote_bus) {
372              
373 0         0 $send_args{'topic'} = "req/$remote_bus-" . int( rand(QUEUE_LANES) + 1 );
374 0         0 $send_args{'topic'} =~ tr|.|/|;
375              
376 0         0 $send_args{'fwd_to'} = "req/$remote_bus/$service";
377 0 0 0     0 $send_args{'fwd_to'} .= "\@$addr" if (defined $addr && $addr =~ s/^\.//);
378 0         0 $send_args{'fwd_to'} =~ tr|.|/|;
379             }
380             else {
381 67         496 $send_args{'topic'} = "req/$local_bus/$service";
382 67         317 $send_args{'topic'} =~ tr|.|/|;
383             }
384              
385 67 100       354 $send_args{'auth'} = $client->{auth_data} if defined $client->{auth_data};
386 67 50       243 $send_args{'clid'} = $client->{caller_id} if defined $client->{caller_id};
387              
388 67         352 my $FIRE_FORGET = $args{req_type} eq 'FIRE_FORGET';
389 67         246 my $SYNCHRONOUS = $args{req_type} eq 'SYNCHRONOUS';
390 67         254 my $raise_error = $args{'raise_error'};
391 67         138 my $req_id;
392              
393             # JSON-RPC call
394             my $req = {
395             jsonrpc => '2.0',
396             method => "$service.$method",
397 67         589 params => $args{'params'},
398             };
399              
400             # Reuse or create a private topic which will receive responses
401             $send_args{'response_topic'} = $client->{response_topic} ||
402 67   66     391 $self->__create_response_topic;
403              
404 67 100       11170 unless ($FIRE_FORGET) {
405             # Assign an unique request id (unique only for this client)
406 64         5420 $req_id = $client->{correlation_id}++;
407 64         255 $req->{'id'} = $req_id;
408             }
409              
410 67         912 my $json = encode_json($req);
411              
412 67 50       269 if (exists $args{'buffer_id'}) {
413 0         0 $send_args{'buffer_id'} = $args{'buffer_id'};
414             }
415              
416             # Send request
417             $self->{_BUS}->publish(
418 67         5841 payload => \$json,
419             qos => 1,
420             %send_args,
421             );
422              
423 67 100       344 if ($FIRE_FORGET) {
    100          
424             # Nothing else to do
425 3         23 return;
426             }
427             elsif ($SYNCHRONOUS) {
428              
429 51 100       292 $req->{_raise_error} = (defined $raise_error) ? $raise_error : 1;
430              
431             # Wait until a response is received in the reply queue
432 51         3395 $req->{_waiting_response} = AnyEvent->condvar;
433 51         969 $req->{_waiting_response}->begin;
434             }
435             else {
436              
437 13         29 $req->{_on_success_cb} = $args{'on_success'};
438 13         38 $req->{_on_error_cb} = $args{'on_error'};
439              
440 13 50 33     35 if ($raise_error && !$req->{_on_error_cb}) {
441             $req->{_on_error_cb} = sub {
442 0     0   0 my $errmsg = $_[0]->code . " " . $_[0]->message;
443 0         0 croak "Call to '$service.$method' failed: $errmsg";
444 0         0 };
445             }
446              
447             # Use shared cv for all requests
448 13 100 100     87 if (!$client->{async_cv} || $client->{async_cv}->ready) {
449 4         135 $client->{async_cv} = AnyEvent->condvar;
450             }
451              
452 13         84 $req->{_waiting_response} = $client->{async_cv};
453 13         42 $req->{_waiting_response}->begin;
454             }
455              
456 64         1213 $client->{in_progress}->{$req_id} = $req;
457              
458             # Ensure that timeout is set properly when the event loop was blocked
459 64 100       294 if ($__now != time) { $__now = time; AnyEvent->now_update }
  24         66  
  24         256  
460              
461             # Request timeout timer
462 64   100     948 my $timeout = $args{'timeout'} || REQ_TIMEOUT;
463             $req->{_timeout} = AnyEvent->timer( after => $timeout, cb => sub {
464 2     2   15100 my $req = delete $client->{in_progress}->{$req_id};
465 2         27 $req->{_response} = Beekeeper::JSONRPC::Error->request_timeout;
466 2 50       11 $req->{_on_error_cb}->($req->{_response}) if $req->{_on_error_cb};
467 2         10 $req->{_waiting_response}->end;
468 64         1030 });
469              
470 64         3126 bless $req, 'Beekeeper::JSONRPC::Request';
471 64         590 return $req;
472             }
473              
474             sub __create_response_topic {
475 6     6   16 my $self = shift;
476 6         24 my $client = $self->{_CLIENT};
477              
478 6         113 my ($file, $line) = (caller(2))[1,2];
479 6         55 my $at = "at $file line $line\n";
480              
481             # Subscribe to an exclusive topic for receiving RPC responses
482              
483 6         25 my $response_topic = 'priv/' . $self->{_BUS}->{client_id};
484 6         15 $client->{response_topic} = $response_topic;
485              
486             $self->{_BUS}->subscribe(
487             topic => $response_topic,
488             maximum_qos => 0,
489             on_publish => sub {
490 64     64   245 my ($payload_ref, $mqtt_properties) = @_;
491              
492 64         210 local $@;
493 64         177 my $resp = eval { decode_json($$payload_ref) };
  64         1717  
494              
495 64 50 33     862 unless (ref $resp eq 'HASH' && $resp->{jsonrpc} eq '2.0') {
496 0         0 warn "Received invalid JSON-RPC 2.0 message $at";
497 0         0 return;
498             }
499              
500 64 50       281 if (exists $resp->{'id'}) {
501              
502             # Response of an RPC request
503              
504 64         239 my $req_id = $resp->{'id'};
505 64         283 my $req = delete $client->{in_progress}->{$req_id};
506              
507             # Ignore unexpected responses
508 64 100       226 return unless $req;
509              
510             # Cancel request timeout
511 62         695 delete $req->{_timeout};
512              
513 62 100       196 if (exists $resp->{'result'}) {
514             # Success response
515 57         470 $req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Response';
516 57 100       260 $req->{_on_success_cb}->($resp) if $req->{_on_success_cb};
517             }
518             else {
519             # Error response
520 5         31 $req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Error';
521 5 100       20 $req->{_on_error_cb}->($resp) if $req->{_on_error_cb};
522             }
523            
524 62         476 $req->{_waiting_response}->end;
525             }
526             else {
527              
528             # Unicasted notification
529              
530 0         0 bless $resp, 'Beekeeper::JSONRPC::Notification';
531 0         0 $resp->{_headers} = $mqtt_properties;
532              
533 0         0 my $method = $resp->{method};
534              
535 0 0 0     0 unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
536 0         0 warn "Received notification with invalid method '$method' $at";
537 0         0 return;
538             }
539              
540             my $cb = $client->{callbacks}->{"msg.$1.$2"} ||
541 0   0     0 $client->{callbacks}->{"msg.$1.*"};
542              
543 0 0       0 unless ($cb) {
544 0         0 warn "No callback found for received notification '$method' $at";
545 0         0 return;
546             }
547              
548 0         0 $cb->($resp->{params}, $resp);
549             }
550             },
551             on_suback => sub {
552 6     6   21 my ($success, $prop) = @_;
553 6 50       79 die "Could not subscribe to response topic '$response_topic' $at" unless $success;
554             }
555 6         320 );
556              
557 6         55 return $response_topic;
558             }
559              
560             sub wait_async_calls {
561 2     2 1 16 my ($self) = @_;
562              
563             # Wait for all pending async requests
564 2         9 my $cv = delete $self->{_CLIENT}->{async_cv};
565 2 50       7 return unless defined $cv;
566              
567             # Make AnyEvent to allow one level of recursive condvar blocking, as we may
568             # block both in $worker->__work_forever and here
569 2 50       7 $AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
570 2         32 local $AE_WAITING = 1;
571 2         4 local $AnyEvent::CondVar::Base::WAITING = 0;
572              
573 2         10 $cv->recv;
574             }
575              
576              
577             sub get_authentication_data {
578 0     0 1 0 my ($self) = @_;
579              
580 0         0 $self->{_CLIENT}->{auth_data};
581             }
582              
583             sub set_authentication_data {
584 0     0 1 0 my ($self, $data) = @_;
585              
586 0         0 $self->{_CLIENT}->{auth_data} = $data;
587             }
588              
589             sub __use_authorization_token {
590 18     18   184 my ($self, $token) = @_;
591              
592             # Using a hashing function makes harder to access the wrong worker pool by mistake,
593             # but it is not an effective access restriction: anyone with access to the backend
594             # bus credentials can easily inspect and clone auth data tokens
595              
596 18         292 my $salt = $self->{_CLIENT}->{auth_salt};
597              
598 18         171 my $adata_ref = \$self->{_CLIENT}->{auth_data};
599              
600 18         268 my $guard = Beekeeper::Client::Guard->new( $adata_ref );
601              
602 18         470 $$adata_ref = md5_base64($token . $salt);
603              
604 18         107 return $guard;
605             }
606              
607             1;
608              
609             package
610             Beekeeper::Client::Guard; # hide from PAUSE
611              
612             sub new {
613 18     18   127 my ($class, $ref) = @_;
614              
615 18         266 bless [$ref, $$ref], $class;
616             }
617              
618             sub DESTROY {
619              
620 18     18   75 ${$_[0]->[0]} = $_[0]->[1];
  18         182  
621             }
622              
623             1;
624              
625             __END__