File Coverage

lib/Beekeeper/Client.pm
Criterion Covered Total %
statement 195 293 66.5
branch 54 114 47.3
condition 14 44 31.8
subroutine 28 36 77.7
pod 10 11 90.9
total 301 498 60.4


line stmt bran cond sub pod time code
1             package Beekeeper::Client;
2              
3 11     11   920002 use strict;
  11         111  
  11         315  
4 11     11   71 use warnings;
  11         27  
  11         505  
5              
6             our $VERSION = '0.10';
7              
8 11     11   4637 use Beekeeper::AnyEvent;
  11         31  
  11         404  
9 11     11   7038 use Beekeeper::MQTT;
  11         43  
  11         562  
10 11     11   4605 use Beekeeper::JSONRPC;
  11         28  
  11         370  
11 11     11   4219 use Beekeeper::Config;
  11         27  
  11         438  
12              
13 11     11   75 use JSON::XS;
  11         20  
  11         515  
14 11     11   7988 use Compress::Raw::Zlib ();
  11         62612  
  11         352  
15 11     11   98 use Digest::MD5 'md5_base64';
  11         23  
  11         631  
16 11     11   121 use Carp;
  11         27  
  11         597  
17              
18 11     11   76 use constant QUEUE_LANES => 2;
  11         21  
  11         639  
19 11     11   77 use constant REQ_TIMEOUT => 60;
  11         20  
  11         645  
20              
21 11     11   78 use Exporter 'import';
  11         22  
  11         52618  
22              
23             our @EXPORT_OK = qw(
24             send_notification
25             call_remote
26             call_remote_async
27             fire_remote
28             wait_async_calls
29             get_authentication_data
30             set_authentication_data
31              
32             __do_rpc_request
33             __create_response_topic
34             __use_authorization_token
35             );
36              
37             our %EXPORT_TAGS = ('worker' => \@EXPORT_OK );
38              
39             our $singleton;
40             my $INFLATE;
41              
42              
43             sub new {
44 6     6 0 106 my ($class, %args) = @_;
45              
46 6         149 my $self = {
47             _CLIENT => undef,
48             _BUS => undef,
49             };
50              
51             $self->{_CLIENT} = {
52 6         492 forward_to => undef,
53             response_topic => undef,
54             in_progress => undef,
55             curr_request => undef,
56             caller_id => undef,
57             caller_addr => undef,
58             auth_data => undef,
59             auth_salt => undef,
60             async_cv => undef,
61             correlation_id => 1,
62             callbacks => {},
63             };
64              
65 6 50 33     263 unless (exists $args{'username'} && exists $args{'password'}) {
66              
67             # Get broker connection parameters from config file
68              
69 6         73 my $bus_id = $args{'bus_id'};
70              
71 6 50       107 if (defined $bus_id) {
72             # Use parameters for specific bus
73 0         0 my $config = Beekeeper::Config->get_bus_config( bus_id => $bus_id );
74 0 0       0 croak "Bus '$bus_id' is not defined into config file bus.config.json" unless $config;
75 0         0 %args = ( %$config, %args );
76             }
77             else {
78 6         245 my $config = Beekeeper::Config->get_bus_config( bus_id => '*');
79 6 50       73 if (scalar(keys %$config) == 1) {
80             # Use the only config present
81 6         52 ($bus_id) = (keys %$config);
82 6         29 %args = ( %{$config->{$bus_id}}, bus_id => $bus_id, %args );
  6         103  
83             }
84             else {
85             # Use default parameters (if any)
86 0         0 my ($default) = grep { $config->{$_}->{default} } keys %$config;
  0         0  
87 0 0       0 croak "No default bus defined into config file bus.config.json" unless $default;
88 0         0 $bus_id = $config->{$default}->{'bus_id'};
89 0         0 %args = ( %{$config->{$default}}, bus_id => $bus_id, %args );
  0         0  
90             }
91             }
92             }
93              
94 6         259 $INFLATE = Compress::Raw::Zlib::Inflate->new( -ConsumeInput => 0 );
95              
96 6         5180 $self->{_CLIENT}->{forward_to} = delete $args{'forward_to'};
97 6   33     146 $self->{_CLIENT}->{auth_salt} = delete $args{'auth_salt'} || $args{'bus_id'};
98              
99             # Start a fresh new MQTT session on connect
100 6         64 $args{'clean_start'} = 1;
101              
102             # Make the MQTT session ends when the connection is closed
103 6         40 $args{'session_expiry_interval'} = 0;
104              
105             # Keep only 1 unacked message (of QoS 1) in flight
106 6         113 $args{'receive_maximum'} = 1;
107              
108             # Do not use topic aliases
109 6         64 $args{'topic_alias_maximum'} = 0;
110              
111              
112 6         239 $self->{_BUS} = Beekeeper::MQTT->new( %args );
113              
114             # Connect to MQTT broker
115 6         129 $self->{_BUS}->connect( blocking => 1 );
116              
117 6         65 bless $self, $class;
118 6         48 return $self;
119             }
120              
121             sub instance {
122 31     31 1 3439845 my $class = shift;
123              
124 31 100       552 if ($singleton) {
125             # Return existing singleton
126 25         392 return $singleton;
127             }
128              
129             # Create a new instance
130 6         159 my $self = $class->new( @_ );
131              
132             # Keep a global reference to $self
133 6         13 $singleton = $self;
134              
135 6         18 return $self;
136             }
137              
138              
139             sub send_notification {
140 5     5 1 1998866 my ($self, %args) = @_;
141              
142 5 50       54 my $fq_meth = $args{'method'} or croak "Method was not specified";
143              
144 5 50       38 $fq_meth .= '@' . $args{'address'} if (defined $args{'address'});
145              
146 5 50       103 $fq_meth =~ m/^ ( [\w-]+ (?:\.[\w-]+)* )
147             \. ( [\w-]+ )
148             (?: \@ ( [\w-]+ ) (\.[\w-]+)* )? $/x or croak "Invalid method '$fq_meth'";
149              
150 5         53 my ($service, $method, $remote_bus, $addr) = ($1, $2, $3, $4);
151              
152             my $json = encode_json({
153             jsonrpc => '2.0',
154             method => "$service.$method",
155 5         98 params => $args{'params'},
156             });
157              
158 5         32 my %send_args;
159              
160 5         26 my $local_bus = $self->{_BUS}->{bus_role};
161              
162 5 50       115 $remote_bus = $self->{_CLIENT}->{forward_to} unless (defined $remote_bus);
163              
164 5 50       23 if (defined $remote_bus) {
165              
166 0         0 $send_args{'topic'} = "msg/$remote_bus-" . int( rand(QUEUE_LANES) + 1 );
167 0         0 $send_args{'topic'} =~ tr|.|/|;
168              
169 0         0 $send_args{'fwd_to'} = "msg/$remote_bus/$service/$method";
170 0 0 0     0 $send_args{'fwd_to'} .= "\@$addr" if (defined $addr && $addr =~ s/^\.//);
171 0         0 $send_args{'fwd_to'} =~ tr|.|/|;
172             }
173             else {
174 5         35 $send_args{'topic'} = "msg/$local_bus/$service/$method";
175 5         25 $send_args{'topic'} =~ tr|.|/|;
176             }
177              
178 5 50       35 $send_args{'auth'} = $self->{_CLIENT}->{auth_data} if defined $self->{_CLIENT}->{auth_data};
179 5 50       34 $send_args{'clid'} = $self->{_CLIENT}->{caller_id} if defined $self->{_CLIENT}->{caller_id};
180              
181 5 50       34 if (exists $args{'buffer_id'}) {
182 0         0 $send_args{'buffer_id'} = $args{'buffer_id'};
183             }
184              
185 5         51 $self->{_BUS}->publish( payload => \$json, %send_args );
186             }
187              
188              
189             sub accept_notifications {
190 0     0 1 0 my ($self, %args) = @_;
191              
192 0         0 my ($file, $line) = (caller)[1,2];
193 0         0 my $at = "at $file line $line\n";
194              
195 0         0 my $callbacks = $self->{_CLIENT}->{callbacks};
196              
197 0         0 foreach my $fq_meth (keys %args) {
198              
199 0 0       0 $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
200             \. ( [\w-]+ | \* ) $/x or croak "Invalid notification method '$fq_meth'";
201              
202 0         0 my ($service, $method) = ($1, $2);
203              
204 0         0 my $callback = $args{$fq_meth};
205              
206 0 0       0 unless (ref $callback eq 'CODE') {
207 0         0 croak "Invalid callback for '$fq_meth'";
208             }
209              
210 0 0       0 croak "Already accepting notifications '$fq_meth'" if exists $callbacks->{"msg.$fq_meth"};
211 0         0 $callbacks->{"msg.$fq_meth"} = $callback;
212              
213             #TODO: Allow to accept private notifications without subscribing
214              
215 0         0 my $local_bus = $self->{_BUS}->{bus_role};
216              
217 0         0 my $topic = "msg/$local_bus/$service/$method";
218 0         0 $topic =~ tr|.*|/#|;
219              
220             $self->{_BUS}->subscribe(
221             topic => $topic,
222             on_publish => sub {
223 0     0   0 my ($payload_ref, $mqtt_properties) = @_;
224              
225 0         0 local $@;
226 0         0 my $request = eval { decode_json($$payload_ref) };
  0         0  
227              
228 0 0 0     0 unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
229 0         0 warn "Received invalid JSON-RPC 2.0 notification $at";
230 0         0 return;
231             }
232              
233 0         0 bless $request, 'Beekeeper::JSONRPC::Notification';
234 0         0 $request->{_mqtt_properties} = $mqtt_properties;
235              
236 0         0 my $method = $request->{method};
237              
238 0 0 0     0 unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
239 0         0 warn "Received notification with invalid method '$method' $at";
240 0         0 return;
241             }
242              
243             my $cb = $callbacks->{"msg.$1.$2"} ||
244 0   0     0 $callbacks->{"msg.$1.*"};
245              
246 0 0       0 unless ($cb) {
247 0         0 warn "No callback found for received notification '$method' $at";
248 0         0 return;
249             }
250              
251 0         0 $cb->($request->{params}, $request);
252             },
253             on_suback => sub {
254 0     0   0 my ($success, $prop) = @_;
255 0 0       0 die "Could not subscribe to topic '$topic' $at" unless $success;
256             }
257 0         0 );
258             }
259             }
260              
261              
262             sub stop_accepting_notifications {
263 0     0 1 0 my ($self, @methods) = @_;
264              
265 0         0 my ($file, $line) = (caller)[1,2];
266 0         0 my $at = "at $file line $line\n";
267              
268 0 0       0 croak "No method specified" unless @methods;
269              
270 0         0 foreach my $fq_meth (@methods) {
271              
272 0 0       0 $fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
273             \. ( [\w-]+ | \* ) $/x or croak "Invalid method '$fq_meth'";
274              
275 0         0 my ($service, $method) = ($1, $2);
276              
277 0 0       0 unless (defined $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"}) {
278 0         0 carp "Not previously accepting notifications '$fq_meth'";
279 0         0 next;
280             }
281              
282 0         0 my $local_bus = $self->{_BUS}->{bus_role};
283              
284 0         0 my $topic = "msg/$local_bus/$service/$method";
285 0         0 $topic =~ tr|.*|/#|;
286              
287             $self->{_BUS}->unsubscribe(
288             topic => $topic,
289             on_unsuback => sub {
290 0     0   0 my ($success, $prop) = @_;
291              
292 0 0       0 die "Could not unsubscribe from topic '$topic' $at" unless $success;
293              
294 0         0 delete $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"};
295             },
296 0         0 );
297             }
298             }
299              
300              
301             our $AE_WAITING;
302              
303             sub call_remote {
304 52     52 1 5402 my $self = shift;
305              
306 52         391 my $req = $self->__do_rpc_request( @_, req_type => 'SYNCHRONOUS' );
307              
308             # Make AnyEvent allow one level of recursive condvar blocking, as we may
309             # block both in $worker->__work_forever and in $client->__do_rpc_request
310 51 50       188 $AE_WAITING && Carp::confess "Recursive blocking call attempted: " .
311             "trying to make a call_remote while another call_remote is still in progress, " .
312             "but it is not possible to make two blocking calls simultaneously " .
313             "(tip: one of the two calls must be made with call_remote_async)";
314              
315 51         149 local $AE_WAITING = 1;
316 51         170 local $AnyEvent::CondVar::Base::WAITING = 0;
317              
318             # Block until a response is received or request timed out
319 51         349 $req->{_waiting_response}->recv;
320              
321 51         2410 my $resp = $req->{_response};
322              
323 51 100 100     1245 if (!exists $resp->{result} && $req->{_raise_error}) {
324 5         19 my $errmsg = $resp->code . " " . $resp->message;
325 5         1204 croak "Call to '$req->{method}' failed: $errmsg";
326             }
327              
328 46         573 return $resp;
329             }
330              
331             sub call_remote_async {
332 13     13 1 750 my $self = shift;
333              
334 13         51 my $req = $self->__do_rpc_request( @_, req_type => 'ASYNCHRONOUS' );
335              
336 13         33 return $req;
337             }
338              
339             sub fire_remote {
340 3     3 1 70 my $self = shift;
341              
342             # Send request to a worker, but do not wait for response
343 3         13 $self->__do_rpc_request( @_, req_type => 'FIRE_FORGET' );
344              
345 3         11 return;
346             }
347              
348             my $__now = 0;
349              
350             sub __do_rpc_request {
351 68     68   829 my ($self, %args) = @_;
352 68         287 my $client = $self->{_CLIENT};
353              
354 68 50       334 my $fq_meth = $args{'method'} or croak "Method was not specified";
355              
356 68 50       266 $fq_meth .= '@' . $args{'address'} if (defined $args{'address'});
357              
358 68 100       1407 $fq_meth =~ m/^ ( [\w-]+ (?:\.[\w-]+)* )
359             \. ( [\w-]+ )
360             (?: \@ ( [\w-]+ ) (\.[\w-]+)* )? $/x or croak "Invalid method '$fq_meth'";
361              
362 67         845 my ($service, $method, $remote_bus, $addr) = ($1, $2, $3, $4);
363              
364 67         174 my %send_args;
365              
366 67         312 my $local_bus = $self->{_BUS}->{bus_role};
367              
368 67 50       299 $remote_bus = $client->{forward_to} unless (defined $remote_bus);
369              
370             # Local bus request sent to: req/{local_bus}/{service_class}
371             # Remote bus request sent to: req/{remote_bus}
372              
373 67 50       266 if (defined $remote_bus) {
374              
375 0         0 $send_args{'topic'} = "req/$remote_bus-" . int( rand(QUEUE_LANES) + 1 );
376 0         0 $send_args{'topic'} =~ tr|.|/|;
377              
378 0         0 $send_args{'fwd_to'} = "req/$remote_bus/$service";
379 0 0 0     0 $send_args{'fwd_to'} .= "\@$addr" if (defined $addr && $addr =~ s/^\.//);
380 0         0 $send_args{'fwd_to'} =~ tr|.|/|;
381             }
382             else {
383 67         412 $send_args{'topic'} = "req/$local_bus/$service";
384 67         265 $send_args{'topic'} =~ tr|.|/|;
385             }
386              
387 67 100       314 $send_args{'auth'} = $client->{auth_data} if defined $client->{auth_data};
388 67 50       247 $send_args{'clid'} = $client->{caller_id} if defined $client->{caller_id};
389              
390 67         271 my $FIRE_FORGET = $args{req_type} eq 'FIRE_FORGET';
391 67         225 my $SYNCHRONOUS = $args{req_type} eq 'SYNCHRONOUS';
392 67         158 my $raise_error = $args{'raise_error'};
393 67         123 my $req_id;
394              
395             # JSON-RPC call
396             my $req = {
397             jsonrpc => '2.0',
398             method => "$service.$method",
399 67         509 params => $args{'params'},
400             };
401              
402             # Reuse or create a private topic which will receive responses
403             $send_args{'response_topic'} = $client->{response_topic} ||
404 67   66     433 $self->__create_response_topic;
405              
406 67 100       236 unless ($FIRE_FORGET) {
407             # Assign an unique request id (unique only for this client)
408 64         201 $req_id = $client->{correlation_id}++;
409 64         219 $req->{'id'} = $req_id;
410             }
411              
412 67         866 my $json = encode_json($req);
413              
414 67 50       264 if (exists $args{'buffer_id'}) {
415 0         0 $send_args{'buffer_id'} = $args{'buffer_id'};
416             }
417              
418             # Send request
419             $self->{_BUS}->publish(
420 67         844 payload => \$json,
421             qos => 1,
422             %send_args,
423             );
424              
425 67 100       337 if ($FIRE_FORGET) {
    100          
426             # Nothing else to do
427 3         17 return;
428             }
429             elsif ($SYNCHRONOUS) {
430              
431 51 100       321 $req->{_raise_error} = (defined $raise_error) ? $raise_error : 1;
432              
433             # Wait until a response is received in the reply queue
434 51         3104 $req->{_waiting_response} = AnyEvent->condvar;
435 51         905 $req->{_waiting_response}->begin;
436             }
437             else {
438              
439 13         31 $req->{_on_success_cb} = $args{'on_success'};
440 13         29 $req->{_on_error_cb} = $args{'on_error'};
441              
442 13 50 33     69 if ($raise_error && !$req->{_on_error_cb}) {
443             $req->{_on_error_cb} = sub {
444 0     0   0 my $errmsg = $_[0]->code . " " . $_[0]->message;
445 0         0 croak "Call to '$service.$method' failed: $errmsg";
446 0         0 };
447             }
448              
449             # Use shared cv for all requests
450 13 100 100     100 if (!$client->{async_cv} || $client->{async_cv}->ready) {
451 4         153 $client->{async_cv} = AnyEvent->condvar;
452             }
453              
454 13         89 $req->{_waiting_response} = $client->{async_cv};
455 13         39 $req->{_waiting_response}->begin;
456             }
457              
458 64         1157 $client->{in_progress}->{$req_id} = $req;
459              
460             # Ensure that timeout is set properly when the event loop was blocked
461 64 100       283 if ($__now != CORE::time) { $__now = CORE::time; AnyEvent->now_update }
  24         54  
  24         244  
462              
463             # Request timeout timer
464 64   100     904 my $timeout = $args{'timeout'} || REQ_TIMEOUT;
465             $req->{_timeout} = AnyEvent->timer( after => $timeout, cb => sub {
466 2     2   15790 my $req = delete $client->{in_progress}->{$req_id};
467 2         30 $req->{_response} = Beekeeper::JSONRPC::Error->request_timeout;
468 2 50       8 $req->{_on_error_cb}->($req->{_response}) if $req->{_on_error_cb};
469 2         7 $req->{_waiting_response}->end;
470 64         937 });
471              
472 64         2880 bless $req, 'Beekeeper::JSONRPC::Request';
473 64         561 return $req;
474             }
475              
476             sub __create_response_topic {
477 6     6   21 my $self = shift;
478 6         15 my $client = $self->{_CLIENT};
479              
480 6         84 my ($file, $line) = (caller(2))[1,2];
481 6         40 my $at = "at $file line $line\n";
482              
483             # Subscribe to an exclusive topic for receiving RPC responses
484              
485 6         25 my $response_topic = 'priv/' . $self->{_BUS}->{client_id};
486 6         14 $client->{response_topic} = $response_topic;
487              
488             $self->{_BUS}->subscribe(
489             topic => $response_topic,
490             maximum_qos => 0,
491             on_publish => sub {
492 64     64   239 my ($payload_ref, $mqtt_properties) = @_;
493              
494 64         135 my $resp;
495 64         186 local $@;
496 64         180 eval {
497              
498 64 50       332 if (substr($$payload_ref,0,1) eq "\x78") {
499 0         0 my $decompressed_json;
500 0         0 $INFLATE->inflate($payload_ref, $decompressed_json);
501 0         0 $INFLATE->inflateReset();
502 0         0 $resp = decode_json($decompressed_json);
503             }
504             else {
505 64         1696 $resp = decode_json($$payload_ref);
506             }
507             };
508              
509 64 50 33     905 unless (ref $resp eq 'HASH' && $resp->{jsonrpc} eq '2.0') {
510 0         0 warn "Received invalid JSON-RPC 2.0 message $at";
511 0         0 return;
512             }
513              
514 64 50       257 if (exists $resp->{'id'}) {
515              
516             # Response of an RPC request
517              
518 64         214 my $req_id = $resp->{'id'};
519 64         283 my $req = delete $client->{in_progress}->{$req_id};
520              
521             # Ignore unexpected responses
522 64 100       236 return unless $req;
523              
524             # Cancel request timeout
525 62         566 delete $req->{_timeout};
526              
527 62 100       203 if (exists $resp->{'result'}) {
528             # Success response
529 57         531 $req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Response';
530 57 100       278 $req->{_on_success_cb}->($resp) if $req->{_on_success_cb};
531             }
532             else {
533             # Error response
534 5         37 $req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Error';
535 5 100       19 $req->{_on_error_cb}->($resp) if $req->{_on_error_cb};
536             }
537            
538 62         465 $req->{_waiting_response}->end;
539             }
540             else {
541              
542             # Unicasted notification
543              
544 0         0 bless $resp, 'Beekeeper::JSONRPC::Notification';
545 0         0 $resp->{_headers} = $mqtt_properties;
546              
547 0         0 my $method = $resp->{method};
548              
549 0 0 0     0 unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
550 0         0 warn "Received notification with invalid method '$method' $at";
551 0         0 return;
552             }
553              
554             my $cb = $client->{callbacks}->{"msg.$1.$2"} ||
555 0   0     0 $client->{callbacks}->{"msg.$1.*"};
556              
557 0 0       0 unless ($cb) {
558 0         0 warn "No callback found for received notification '$method' $at";
559 0         0 return;
560             }
561              
562 0         0 $cb->($resp->{params}, $resp);
563             }
564             },
565             on_suback => sub {
566 6     6   20 my ($success, $prop) = @_;
567 6 50       82 die "Could not subscribe to response topic '$response_topic' $at" unless $success;
568             }
569 6         222 );
570              
571 6         58 return $response_topic;
572             }
573              
574             sub wait_async_calls {
575 2     2 1 12 my ($self) = @_;
576              
577             # Wait for all pending async requests
578 2         7 my $cv = delete $self->{_CLIENT}->{async_cv};
579 2 50       8 return unless defined $cv;
580              
581             # Make AnyEvent to allow one level of recursive condvar blocking, as we may
582             # block both in $worker->__work_forever and here
583 2 50       8 $AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
584 2         7 local $AE_WAITING = 1;
585 2         4 local $AnyEvent::CondVar::Base::WAITING = 0;
586              
587 2         10 $cv->recv;
588             }
589              
590              
591             sub get_authentication_data {
592 0     0 1 0 my ($self) = @_;
593              
594 0         0 $self->{_CLIENT}->{auth_data};
595             }
596              
597             sub set_authentication_data {
598 0     0 1 0 my ($self, $data) = @_;
599              
600 0         0 $self->{_CLIENT}->{auth_data} = $data;
601             }
602              
603             sub __use_authorization_token {
604 18     18   181 my ($self, $token) = @_;
605              
606             # Using a hashing function makes harder to access the wrong worker pool by mistake,
607             # but it is not an effective access restriction: anyone with access to the backend
608             # bus credentials can easily inspect and clone auth data tokens
609              
610 18         212 my $salt = $self->{_CLIENT}->{auth_salt};
611              
612 18         110 my $adata_ref = \$self->{_CLIENT}->{auth_data};
613              
614 18         208 my $guard = Beekeeper::Client::Guard->new( $adata_ref );
615              
616 18         397 $$adata_ref = md5_base64($token . $salt);
617              
618 18         106 return $guard;
619             }
620              
621             1;
622              
623             package
624             Beekeeper::Client::Guard; # hide from PAUSE
625              
626             sub new {
627 18     18   122 my ($class, $ref) = @_;
628              
629 18         175 bless [$ref, $$ref], $class;
630             }
631              
632             sub DESTROY {
633              
634 18     18   73 ${$_[0]->[0]} = $_[0]->[1];
  18         177  
635             }
636              
637             1;
638              
639             __END__