File Coverage

lib/Beekeeper/Service/ToyBroker/Worker.pm
Criterion Covered Total %
statement 33 643 5.1
branch 0 274 0.0
condition 0 16 0.0
subroutine 11 69 15.9
pod 3 20 15.0
total 47 1022 4.6


line stmt bran cond sub pod time code
1             package Beekeeper::Service::ToyBroker::Worker;
2              
3 1     1   1197 use strict;
  1         3  
  1         31  
4 1     1   5 use warnings;
  1         2  
  1         44  
5              
6             our $VERSION = '0.09';
7              
8 1     1   5 use Beekeeper::Worker ':log';
  1         2  
  1         139  
9 1     1   7 use base 'Beekeeper::Worker';
  1         2  
  1         69  
10              
11 1     1   7 use Beekeeper::MQTT qw(:const :decode);
  1         3  
  1         327  
12 1     1   8 use Beekeeper::Config;
  1         2  
  1         23  
13              
14 1     1   6 use AnyEvent::Handle;
  1         2  
  1         38  
15 1     1   8 use AnyEvent::Socket;
  1         3  
  1         133  
16 1     1   16 use Scalar::Util 'weaken';
  1         2  
  1         44  
17 1     1   992 use Carp;
  1         4  
  1         61  
18              
19 1     1   6 use constant DEBUG => 0;
  1         2  
  1         7771  
20              
21              
22             sub new {
23 0     0 0   my ($class, %args) = @_;
24              
25 0           my $self = $class->SUPER::new(%args);
26              
27 0           $self->start_broker;
28              
29             # Postponed initialization
30 0           $self->SUPER::__init_client;
31 0           $self->{_LOGGER}->{_BUS} = $self->{_BUS};
32 0           $self->SUPER::__init_auth_tokens;
33 0           $self->SUPER::__init_worker;
34              
35 0           return $self;
36             }
37              
38       0     sub __init_client { }
39       0     sub __init_auth_tokens { }
40       0     sub __init_worker { }
41       0 1   sub on_startup { }
42              
43             sub on_shutdown {
44 0     0 1   my $self = shift;
45              
46 0           log_info "Shutting down";
47              
48             # Wait for clients to gracefully disconnect
49 0           for (1..60) {
50 0           my $conn_count = scalar keys %{$self->{connections}};
  0            
51 0 0         last if $conn_count <= 1; # our self connection
52 0           my $wait = AnyEvent->condvar;
53 0           my $tmr = AnyEvent->timer( after => 0.5, cb => $wait );
54 0           $wait->recv;
55             }
56              
57             # Get rid of our self connection
58 0           $self->{_BUS}->disconnect;
59              
60 0           log_info "Stopped";
61             }
62              
63             sub authorize_request {
64 0     0 1   my ($self, $req) = @_;
65              
66 0           return BKPR_REQUEST_AUTHORIZED;
67             }
68              
69             sub start_broker {
70 0     0 0   my ($self) = @_;
71              
72 0           $self->{connections} = {};
73 0           $self->{clients} = {};
74 0           $self->{topics} = {};
75 0           $self->{users} = {};
76              
77 0           my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );
78              
79             # Start a default listener if no config found
80 0 0         $config = [ {} ] unless defined $config;
81              
82 0           foreach my $listener (@$config) {
83              
84 0 0         if ($listener->{users}) {
85 0           %{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
  0            
  0            
  0            
86             }
87              
88 0           $self->start_listener( $listener );
89             }
90             }
91              
92             sub start_listener {
93 0     0 0   my ($self, $listener) = @_;
94 0           weaken($self);
95              
96 0           my $max_packet_size = $listener->{'max_packet_size'};
97              
98 0   0       my $addr = $listener->{'listen_addr'} || '127.0.0.1'; # Must be an IPv4 or IPv6 address
99 0   0       my $port = $listener->{'listen_port'} || 1883;
100              
101 0           ($addr) = ($addr =~ m/^([\w\.:]+)$/); # untaint
102 0           ($port) = ($port =~ m/^(\d+)$/);
103              
104 0           log_info "Listening on $addr:$port";
105              
106             $self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
107 0     0     my ($FH, $host, $port) = @_;
108              
109 0           my $packet_type;
110             my $packet_flags;
111              
112 0           my $rbuff_len;
113 0           my $packet_len;
114              
115 0           my $mult;
116 0           my $offs;
117 0           my $byte;
118              
119 0           my $fh; $fh = AnyEvent::Handle->new(
120             fh => $FH,
121             keepalive => 1,
122             no_delay => 1,
123             on_read => sub {
124              
125             PARSE_PACKET: {
126              
127 0           $rbuff_len = length $fh->{rbuf};
  0            
128              
129 0 0         return unless $rbuff_len >= 2;
130              
131 0 0         unless ($packet_type) {
132              
133 0           $packet_len = 0;
134 0           $mult = 1;
135 0           $offs = 1;
136              
137             PARSE_LEN: {
138 0           $byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
  0            
139 0           $packet_len += ($byte & 0x7f) * $mult;
140 0 0         last unless ($byte & 0x80);
141 0 0         return if ($offs >= $rbuff_len); # Not enough data
142 0           $mult *= 128;
143 0 0         redo if ($offs < 5);
144             }
145              
146 0 0 0       if ($max_packet_size && $packet_len > $max_packet_size) {
147 0           $self->disconnect($fh, reason_code => 0x95);
148 0           return;
149             }
150              
151 0           $byte = unpack('C', substr( $fh->{rbuf}, 0, 1 ));
152 0           $packet_type = $byte >> 4;
153 0           $packet_flags = $byte & 0x0F;
154             }
155              
156 0 0         if ($rbuff_len < ($offs + $packet_len)) {
157             # Not enough data
158 0           return;
159             }
160              
161             # Consume packet from buffer
162 0           my $packet = substr($fh->{rbuf}, 0, ($offs + $packet_len), '');
163              
164             # Trim fixed header from packet
165 0           substr($packet, 0, $offs, '');
166              
167 0 0         if ($packet_type == MQTT_PUBLISH) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
168              
169 0           $self->_receive_publish($fh, \$packet, $packet_flags);
170             }
171             elsif ($packet_type == MQTT_PUBACK) {
172              
173 0           $self->_receive_puback($fh, \$packet);
174             }
175             elsif ($packet_type == MQTT_PINGREQ) {
176              
177 0           $self->pingresp($fh);
178             }
179             elsif ($packet_type == MQTT_PINGRESP) {
180              
181 0           $self->_receive_pingresp($fh);
182             }
183             elsif ($packet_type == MQTT_SUBSCRIBE) {
184              
185 0           $self->_receive_subscribe($fh, \$packet);
186             }
187             elsif ($packet_type == MQTT_UNSUBSCRIBE) {
188              
189 0           $self->_receive_unsubscribe($fh, \$packet);
190             }
191             elsif ($packet_type == MQTT_CONNECT) {
192              
193 0           $self->_receive_connect($fh, \$packet);
194             }
195             elsif ($packet_type == MQTT_DISCONNECT) {
196              
197 0           $self->_receive_disconnect($fh, \$packet);
198             }
199             elsif ($packet_type == MQTT_PUBREC) {
200              
201 0           $self->_receive_pubrec($fh, \$packet);
202             }
203             elsif ($packet_type == MQTT_PUBREL) {
204            
205 0           $self->_receive_pubrel($fh, \$packet);
206             }
207             elsif ($packet_type == MQTT_PUBCOMP) {
208              
209 0           $self->_receive_pubcomp($fh, \$packet);
210             }
211             elsif ($packet_type == MQTT_AUTH) {
212              
213 0           $self->_receive_auth($fh, \$packet);
214             }
215             else {
216             # Protocol error
217 0           log_warn "Received packet with unknown type $packet_type";
218 0           $self->disconnect($fh, reason_code => 0x81);
219 0           return;
220             }
221              
222             # Prepare for next frame
223 0           undef $packet_type;
224              
225             # Handle could have been destroyed at this point
226 0 0         redo PARSE_PACKET if defined $fh->{rbuf};
227             }
228             },
229             on_eof => sub {
230             # Clean disconnection, client will not write anymore
231 0           $self->remove_client($fh);
232 0           delete $self->{connections}->{"$fh"};
233             },
234             on_error => sub {
235 0           log_error "$_[2]\n";
236 0           $self->remove_client($fh);
237 0           delete $self->{connections}->{"$fh"};
238             }
239 0           );
240              
241 0           $self->{connections}->{"$fh"} = $fh;
242              
243             #TODO: Close connection on login timeout
244             # my $login_tmr = AnyEvent->timer( after => 5, cb => sub {
245             # $self->_shutdown($fh) unless $self->get_client($fh);
246             # });
247 0           });
248             }
249              
250             sub _receive_connect {
251 0     0     my ($self, $fh, $packet) = @_;
252              
253 0           my %prop;
254 0           my $offs = 0;
255              
256             # 3.1.2.1 Protocol Name (utf8 string)
257 0           $prop{'protocol_name'} = _decode_utf8_str($packet, \$offs);
258              
259             # 3.1.2.2 Protocol Version (byte)
260 0           $prop{'protocol_version'} = _decode_byte($packet, \$offs);
261              
262             # 3.1.2.3 Connect Flags (byte)
263 0           my $flags = _decode_byte($packet, \$offs);
264 0 0         $prop{'clean_start'} = 1 if $flags & 0x02; # 3.1.2.4 Clean Start
265 0 0         $prop{'username'} = 1 if $flags & 0x80; # 3.1.2.8 User Name Flag
266 0 0         $prop{'password'} = 1 if $flags & 0x40; # 3.1.2.9 Password Flag
267 0 0         $prop{'will_flag'} = 1 if $flags & 0x04; # 3.1.2.5 Will Flag
268 0           $prop{'will_qos'} = ($flags & 0x18) >> 3; # 3.1.2.6 Will QoS
269 0 0         $prop{'will_retain'} = 1 if $flags & 0x20; # 3.1.2.7 Will Retain
270              
271             # 3.1.2.10 Keep Alive (short int)
272 0           $prop{'keep_alive'} = _decode_int_16($packet, \$offs);
273              
274             # 3.1.2.11.1 Properties Length (variable length int)
275 0           my $prop_len = _decode_var_int($packet, \$offs);
276 0           my $prop_end = $offs + $prop_len;
277              
278 0           while ($offs < $prop_end) {
279              
280 0           my $prop_id = _decode_byte($packet, \$offs);
281              
282 0 0         if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
283             # 3.1.2.11.2 Session Expiry Interval (long int)
284 0           $prop{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
285             }
286             elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) {
287             # 3.1.2.11.3 Receive Maximum (short int)
288 0           $prop{'receive_maximum'} = _decode_int_16($packet, \$offs);
289             }
290             elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) {
291             # 3.1.2.11.4 Maximum Packet Size (long int)
292 0           $prop{'maximum_packet_size'} = _decode_int_32($packet, \$offs);
293             }
294             elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) {
295             # 3.1.2.11.5 Topic Alias Maximum (short int)
296 0           $prop{'topic_alias_maximum'} = _decode_int_16($packet, \$offs);
297             }
298             elsif ($prop_id == MQTT_REQUEST_RESPONSE_INFORMATION) {
299             # 3.1.2.11.6 Request Response Information (byte)
300 0           $prop{'request_response_information'} = _decode_byte($packet, \$offs);
301             }
302             elsif ($prop_id == MQTT_REQUEST_PROBLEM_INFORMATION) {
303             # 3.1.2.11.7 Request Problem Information (byte)
304 0           $prop{'request_problem_information'} = _decode_byte($packet, \$offs);
305             }
306             elsif ($prop_id == MQTT_USER_PROPERTY) {
307             # 3.1.2.11.8 User Property (utf8 string pair)
308 0           my $key = _decode_utf8_str($packet, \$offs);
309 0           my $val = _decode_utf8_str($packet, \$offs);
310 0           $prop{$key} = $val;
311             }
312             elsif ($prop_id == MQTT_AUTHENTICATION_METHOD) {
313             # 3.1.2.11.9 Authentication Method (utf8 string)
314 0           $prop{'authentication_method'} = _decode_utf8_str($packet, \$offs);
315             }
316             elsif ($prop_id == MQTT_AUTHENTICATION_DATA) {
317             # 3.1.2.11.10 Authentication Data (binary data)
318 0           $prop{'authentication_data'} = _decode_binary_data($packet, \$offs);
319             }
320             else {
321             # Protocol error
322 0           log_warn "Received CONNECT with unknown property $prop_id";
323 0           $self->_shutdown($fh);
324 0           return;
325             }
326             }
327              
328             # 3.1.3.1 Client Identifier (utf8 string)
329 0           $prop{'client_identifier'} = _decode_utf8_str($packet, \$offs);
330              
331 0 0         if ($prop{'will'}) {
332              
333             # 3.1.3.2.1 Will Properties Length
334 0           my $prop_len = _decode_var_int($packet, \$offs);
335              
336             #TODO: 3.1.3.2 Will Properties
337 0           $offs += $prop_len;
338              
339             # 3.1.3.3 Will Topic (utf8 string)
340 0           $prop{'will_topic'} = _decode_utf8_str($packet, \$offs);
341              
342             # 3.1.3.4 Will Payload (binary data)
343 0           $prop{'will_payload'} = _decode_binary_data($packet, \$offs);
344             }
345              
346 0 0         if ($prop{'username'}) {
347             # 3.1.3.5 Username (utf8 string)
348 0           $prop{'username'} = _decode_utf8_str($packet, \$offs);
349             }
350              
351 0 0         if ($prop{'password'}) {
352             # 3.1.3.6 Password (binary data)
353 0           $prop{'password'} = _decode_utf8_str($packet, \$offs);
354             }
355              
356 0 0         unless ($prop{'protocol_version'} eq '5') {
357 0           log_warn "Received CONNECT with unsupported protocol version";
358 0           $self->_shutdown($fh);
359 0           return;
360             }
361              
362 0           $self->add_client($fh, \%prop);
363             }
364              
365             sub connack {
366 0     0 0   my ($self, $fh, %args) = @_;
367              
368 0           my $reason_code = delete $args{'reason_code'};
369 0           my $session_present = delete $args{'session_present'};
370              
371             # 3.2.2.3 Properties
372              
373 0           my $raw_prop;
374              
375 0 0         if (exists $args{'session_expiry_interval'}) {
376             # 3.2.2.3.2 Session Expiry Interval (long int)
377 0           $raw_prop .= pack("C N", MQTT_SESSION_EXPIRY_INTERVAL, delete $args{'session_expiry_interval'});
378             }
379              
380 0 0         if (exists $args{'receive_maximum'}) {
381             # 3.2.2.3.3 Receive Maximum (short int)
382 0           $raw_prop .= pack("C n", MQTT_RECEIVE_MAXIMUM, delete $args{'receive_maximum'});
383             }
384              
385 0 0         if (exists $args{'maximum_qos'}) {
386             # 3.2.2.3.4 Maximum QoS (byte)
387 0           $raw_prop .= pack("C C", MQTT_MAXIMUM_QOS, delete $args{'maximum_qos'});
388             }
389              
390 0 0         if (exists $args{'retain_available'}) {
391             # 3.2.2.3.5 Retain Available (byte)
392 0           $raw_prop .= pack("C C", MQTT_RETAIN_AVAILABLE, delete $args{'retain_available'});
393             }
394              
395 0 0         if (exists $args{'maximum_packet_size'}) {
396             # 3.2.2.3.6 Maximum Packet Size (long int)
397 0           $raw_prop .= pack("C N", MQTT_MAXIMUM_PACKET_SIZE, delete $args{'maximum_packet_size'});
398             }
399              
400 0 0         if (exists $args{'assigned_client_identifier'}) {
401             # 3.2.2.3.7 Assigned Client Identifier (utf8 string)
402 0           utf8::encode( $args{'assigned_client_identifier'} );
403 0           $raw_prop .= pack("C n/a*", MQTT_ASSIGNED_CLIENT_IDENTIFIER, delete $args{'assigned_client_identifier'});
404             }
405              
406 0 0         if (exists $args{'topic_alias_maximum'}) {
407             # 3.2.2.3.8 Topic Alias Maximum (short int)
408 0           $raw_prop .= pack("C n", MQTT_TOPIC_ALIAS_MAXIMUM, delete $args{'topic_alias_maximum'});
409             }
410              
411 0 0         if (exists $args{'reason_string'}) {
412             # 3.2.2.3.9 Reason String (utf8 string)
413 0           utf8::encode( $args{'reason_string'} );
414 0           $raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
415             }
416              
417 0 0         if (exists $args{'wildcard_subscription_available'}) {
418             # 3.2.2.3.11 Wildcard Subscription Available (byte)
419 0           $raw_prop .= pack("C C", MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE, delete $args{'wildcard_subscription_available'});
420             }
421              
422 0 0         if (exists $args{'subscription_identifier_available'}) {
423             # 3.2.2.3.12 Subscription Identifiers Available (byte)
424 0           $raw_prop .= pack("C C", MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE, delete $args{'subscription_identifier_available'});
425             }
426              
427 0 0         if (exists $args{'shared_subscription_available'}) {
428             # 3.2.2.3.13 Shared Subscription Available (byte)
429 0           $raw_prop .= pack("C C", MQTT_SHARED_SUBSCRIPTION_AVAILABLE, delete $args{'shared_subscription_available'});
430             }
431              
432 0 0         if (exists $args{'server_keep_alive'}) {
433             # 3.2.2.3.14 Server Keep Alive (short int)
434 0           $raw_prop .= pack("C n", MQTT_SERVER_KEEP_ALIVE, delete $args{'server_keep_alive'});
435             }
436              
437 0 0         if (exists $args{'response_information'}) {
438             # 3.2.2.3.15 Response Information (utf8 string)
439 0           utf8::encode( $args{'response_information'} );
440 0           $raw_prop .= pack("C n/a*", MQTT_RESPONSE_INFORMATION, delete $args{'response_information'});
441             }
442              
443 0 0         if (exists $args{'server_reference'}) {
444             # 3.2.2.3.16 Server Reference (utf8 string)
445 0           utf8::encode( $args{'server_reference'} );
446 0           $raw_prop .= pack("C n/a*", MQTT_SERVER_REFERENCE, delete $args{'server_reference'});
447             }
448              
449 0 0         if (exists $args{'authentication_method'}) {
450             # 3.2.2.3.17 Authentication Method (utf8 string)
451 0           utf8::encode( $args{'authentication_method'} );
452 0           $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $args{'authentication_method'});
453             }
454              
455 0 0         if (exists $args{'authentication_data'}) {
456             # 3.2.2.3.18 Authentication Data (binary data)
457 0           $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_DATA, delete $args{'authentication_data'});
458             }
459              
460 0           foreach my $key (keys %args) {
461             # 3.2.2.3.10 User Property (utf8 string pair)
462 0           my $val = $args{$key};
463 0 0         next unless defined $val;
464 0           utf8::encode( $key );
465 0           utf8::encode( $val );
466 0           $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
467             }
468              
469             # 3.2.2 Variable Header
470              
471             # 3.2.2.1 Acknowledge flags (byte)
472 0   0       my $raw_mqtt = pack("C", $reason_code || 0);
473              
474             # 3.2.2.2 Reason code (byte)
475 0 0         $raw_mqtt .= pack("C", $session_present ? 0x01 : 0);
476              
477             # 3.2.2.3 Properties
478 0           $raw_mqtt .= _encode_var_int(length $raw_prop);
479 0           $raw_mqtt .= $raw_prop;
480              
481 0           $fh->push_write(
482             pack("C", MQTT_CONNACK << 4) . # 3.2.1 Packet type
483             _encode_var_int(length $raw_mqtt) . # 3.2.1 Packet length
484             $raw_mqtt
485             );
486             }
487              
488             sub _receive_disconnect {
489 0     0     my ($self, $fh, $packet) = @_;
490              
491             # Handle abbreviated packet
492 0 0         $$packet = "\x00\x00" if (length $$packet == 0);
493              
494             # 3.14.2.1 Reason Code (byte)
495 0           my $offs = 0;
496 0           my $reason_code = _decode_byte($packet, \$offs);
497              
498             # 3.14.2.2.1 Property Length (variable length int)
499 0           my $prop_len = _decode_var_int($packet, \$offs);
500 0           my $prop_end = $offs + $prop_len;
501              
502 0           my %prop = (
503             'reason_code' => $reason_code,
504             );
505              
506 0           while ($offs < $prop_end) {
507              
508 0           my $prop_id = _decode_byte($packet, \$offs);
509              
510 0 0         if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
    0          
    0          
    0          
511             # 3.14.2.2.2 Session Expiry Interval (long int)
512 0           $prop{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
513             }
514             elsif ($prop_id == MQTT_REASON_STRING) {
515             # 3.14.2.2.3 Reason String (utf8 string)
516 0           $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
517             }
518             elsif ($prop_id == MQTT_USER_PROPERTY) {
519             # 3.14.2.2.4 User Property (utf8 string pair)
520 0           my $key = _decode_utf8_str($packet, \$offs);
521 0           my $val = _decode_utf8_str($packet, \$offs);
522 0           $prop{$key} = $val;
523             }
524             elsif ($prop_id == MQTT_SERVER_REFERENCE) {
525             # 3.14.2.2.5 Server Reference (utf8 string)
526 0           $prop{'server_reference'} = _decode_utf8_str($packet, \$offs);
527             }
528             else {
529             # Protocol error
530 0           log_warn "Received DISCONNECT with unknown property $prop_id";
531 0           $self->_shutdown($fh);
532 0           return;
533             }
534             }
535              
536 0           $self->_shutdown($fh);
537             }
538              
539             sub disconnect {
540 0     0 0   my ($self, $fh, %args) = @_;
541              
542 0           my $reason_code = delete $args{'reason_code'};
543              
544             # 3.14.2.2 Properties
545              
546 0           my $raw_prop = '';
547              
548 0 0         if (exists $args{'reason_string'}) {
549             # 3.14.2.2.3 Reason String (utf8 string)
550 0           utf8::encode( $args{'reason_string'} );
551 0           $raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
552             }
553              
554 0 0         if (exists $args{'server_reference'}) {
555             # 3.14.2.2.5 Server Reference (utf8 string)
556 0           utf8::encode( $args{'server_reference'} );
557 0           $raw_prop .= pack("C n/a*", MQTT_SERVER_REFERENCE, delete $args{'server_reference'});
558             }
559              
560 0           foreach my $key (keys %args) {
561             # 3.14.2.2.4 User Property (utf8 string pair)
562 0           my $val = $args{$key};
563 0 0         next unless defined $val;
564 0           utf8::encode( $key );
565 0           utf8::encode( $val );
566 0           $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
567             }
568              
569             # 3.14.2 Variable Header
570              
571             # 3.14.2.1 Disconnect Reason Code (byte)
572 0   0       my $raw_mqtt = pack("C", $reason_code || 0);
573              
574             # 3.14.2.2 Properties
575 0           $raw_mqtt .= _encode_var_int(length $raw_prop);
576 0           $raw_mqtt .= $raw_prop;
577              
578 0           $fh->push_write(
579             pack("C", MQTT_DISCONNECT << 4) . # 3.14.1 Packet type
580             _encode_var_int(length $raw_mqtt) . # 3.14.1 Packet length
581             $raw_mqtt
582             );
583              
584 0           $self->_shutdown($fh);
585             }
586              
587             sub _shutdown {
588 0     0     my ($self, $fh) = @_;
589              
590 0           $self->remove_client($fh);
591              
592 0           delete $self->{connections}->{"$fh"};
593             }
594              
595             sub _receive_subscribe {
596 0     0     my ($self, $fh, $packet) = @_;
597              
598             # 3.8.2 Packet identifier (short int)
599 0           my $offs = 0;
600 0           my $packet_id = _decode_int_16($packet, \$offs);
601              
602             # 3.8.2.1 Properties Length (variable length int)
603 0           my $prop_len = _decode_var_int($packet, \$offs);
604 0           my $prop_end = $offs + $prop_len;
605 0           my %prop;
606              
607 0           while ($offs < $prop_end) {
608              
609 0           my $prop_id = _decode_byte($packet, \$offs);
610              
611 0 0         if ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER) {
    0          
612             # 3.8.2.1.2 Subscription Identifier (variable len int)
613 0           $prop{'subscription_identifier'} = _decode_var_int($packet, \$offs);
614             }
615             elsif ($prop_id == MQTT_USER_PROPERTY) {
616             # 3.8.2.1.3 User Property (utf8 string pair)
617 0           my $key = _decode_utf8_str($packet, \$offs);
618 0           my $val = _decode_utf8_str($packet, \$offs);
619 0           $prop{$key} = $val;
620             }
621             else {
622             # Protocol error
623 0           log_warn "Received SUBSCRIBE with unexpected property $prop_id";
624 0           $self->disconnect($fh, reason_code => 0x81);
625 0           return;
626             }
627             }
628              
629             # 3.8.3 Payload
630              
631 0           my @reason_codes;
632              
633 0           while ($offs < length $$packet) {
634              
635             # 3.8.3 Topic Filter (utf8 string)
636 0           $prop{'topic_filter'} = _decode_utf8_str($packet, \$offs);
637              
638             # 3.8.3.1 Subscription Options (byte)
639 0           my $options = _decode_byte($packet, \$offs);
640              
641 0           $prop{'maximum_qos'} = ($options & 0x03);
642 0           $prop{'no_local'} = ($options & 0x04) >> 2;
643 0           $prop{'retain_as_published'} = ($options & 0x08) >> 3;
644 0           $prop{'retain_handling'} = ($options & 0x30) >> 4;
645              
646 0           my $reason_code = $self->subscribe_client($fh, \%prop);
647              
648 0           push @reason_codes, $reason_code;
649             }
650              
651 0           $self->suback( $fh,
652             packet_id => $packet_id,
653             reason_codes => \@reason_codes,
654             );
655             }
656              
657             sub suback {
658 0     0 0   my ($self, $fh, %args) = @_;
659              
660 0           my $packet_id = delete $args{'packet_id'};
661 0           my $reason_codes = delete $args{'reason_codes'};
662              
663             # 3.9.2.1 Properties
664              
665 0           my $raw_prop = '';
666              
667 0 0         if (exists $args{'reason_string'}) {
668             # 3.9.2.1.2 Reason String (utf8 string)
669 0           utf8::encode( $args{'reason_string'} );
670 0           $raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
671             }
672              
673 0           foreach my $key (keys %args) {
674             # 3.9.2.1.3 User Property (utf8 string pair)
675 0           my $val = $args{$key};
676 0 0         next unless defined $val;
677 0           utf8::encode( $key );
678 0           utf8::encode( $val );
679 0           $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
680             }
681              
682             # 3.9.2 Variable Header
683              
684             # 3.9.2 Packet id (short int)
685 0           my $raw_mqtt = pack("n", $packet_id);
686              
687             # 3.9.2.1 Properties
688 0           $raw_mqtt .= _encode_var_int(length $raw_prop);
689 0           $raw_mqtt .= $raw_prop;
690              
691             # 3.9.3 Payload
692              
693 0           foreach my $code (@$reason_codes) {
694             # 3.9.3 Reason Codes (byte)
695 0           $raw_mqtt .= pack("C", $code);
696             }
697              
698             $fh->push_write(
699 0           pack("C", MQTT_SUBACK << 4) . # 3.9.1 Packet type
700             _encode_var_int(length $raw_mqtt) . # 3.9.1 Packet length
701             $raw_mqtt
702             );
703             }
704              
705             sub _receive_unsubscribe {
706 0     0     my ($self, $fh, $packet) = @_;
707              
708             # 3.10.2 Packet identifier (short int)
709 0           my $offs = 0;
710 0           my $packet_id = _decode_int_16($packet, \$offs);
711              
712             # 3.10.2.1 Properties Length (variable length int)
713 0           my $prop_len = _decode_var_int($packet, \$offs);
714 0           my $prop_end = $offs + $prop_len;
715 0           my %prop;
716              
717 0           while ($offs < $prop_end) {
718              
719 0           my $prop_id = _decode_byte($packet, \$offs);
720              
721 0 0         if ($prop_id == MQTT_USER_PROPERTY) {
722             # 3.10.2.1.2 User Property (utf8 string pair)
723 0           my $key = _decode_utf8_str($packet, \$offs);
724 0           my $val = _decode_utf8_str($packet, \$offs);
725 0           $prop{$key} = $val;
726             }
727             else {
728             # Protocol error
729 0           log_warn "Received UNSUBSCRIBE with unexpected property $prop_id";
730 0           $self->disconnect($fh, reason_code => 0x81);
731 0           return;
732             }
733             }
734              
735             # 3.10.3 Payload
736              
737 0           my @reason_codes;
738              
739 0           while ($offs < length $$packet) {
740              
741             # 3.10.3 Topic Filter (utf8 string)
742 0           $prop{'topic_filter'} = _decode_utf8_str($packet, \$offs);
743              
744 0           my $reason_code = $self->unsubscribe_client($fh, \%prop);
745              
746 0           push @reason_codes, $reason_code;
747             }
748              
749 0           $self->unsuback( $fh,
750             packet_id => $packet_id,
751             reason_codes => \@reason_codes,
752             );
753             }
754              
755             sub unsuback {
756 0     0 0   my ($self, $fh, %args) = @_;
757              
758 0           my $packet_id = delete $args{'packet_id'};
759 0           my $reason_codes = delete $args{'reason_codes'};
760              
761             # 3.11.2.1 Properties
762              
763 0           my $raw_prop = '';
764              
765 0 0         if (exists $args{'reason_string'}) {
766             # 3.11.2.1.2 Reason String (utf8 string)
767 0           utf8::encode( $args{'reason_string'} );
768 0           $raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
769             }
770              
771 0           foreach my $key (keys %args) {
772             # 3.11.2.1.3 User Property (utf8 string pair)
773 0           my $val = $args{$key};
774 0 0         next unless defined $val;
775 0           utf8::encode( $key );
776 0           utf8::encode( $val );
777 0           $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
778             }
779              
780             # 3.14.2 Variable Header
781              
782             # 3.11.2 Packet id (short int)
783 0           my $raw_mqtt = pack("n", $packet_id);
784              
785             # 3.11.2.1 Properties
786 0           $raw_mqtt .= _encode_var_int(length $raw_prop);
787 0           $raw_mqtt .= $raw_prop;
788              
789             # 3.11.3 Payload
790              
791 0           foreach my $code (@$reason_codes) {
792             # 3.11.3 Reason Codes (byte)
793 0           $raw_mqtt .= pack("C", $code);
794             }
795              
796             $fh->push_write(
797 0           pack("C", MQTT_UNSUBACK << 4) . # 3.11.1 Packet type
798             _encode_var_int(length $raw_mqtt) . # 3.11.1 Packet length
799             $raw_mqtt
800             );
801             }
802              
803             sub pingreq {
804 0     0 0   my ($self, $fh) = @_;
805              
806 0           $fh->push_write(
807             pack( "C C",
808             MQTT_PINGREQ << 4, # 3.12.1 Packet type
809             0, # 3.12.1 Remaining length
810             )
811             );
812             }
813              
814             sub pingresp {
815 0     0 0   my ($self, $fh) = @_;
816              
817 0           $fh->push_write(
818             pack( "C C",
819             MQTT_PINGRESP << 4, # 3.13.1 Packet type
820             0, # 3.13.1 Remaining length
821             )
822             );
823             }
824              
825             sub _receive_pingresp {
826 0     0     my ($self, $fh) = @_;
827              
828             # No action taken
829             }
830              
831             sub _receive_publish {
832 0     0     my ($self, $fh, $packet, $flags) = @_;
833              
834             # 3.3.2.1 Topic Name (utf8 str)
835 0           my $topic = unpack("n/a", $$packet);
836 0           my $offs = 2 + length $topic;
837 0           utf8::decode($topic);
838              
839 0           my %prop = (
840             'topic' => $topic,
841             'qos' => ($flags & 0x6) >> 1,
842             'dup' => ($flags & 0x8) >> 3,
843             );
844              
845             # 3.3.2.2 Packet Identifier (short int)
846 0 0         if ($prop{'qos'} > 0) {
847 0           $prop{'packet_id'} = unpack("n", substr($$packet, $offs, 2));
848 0           $offs += 2;
849             }
850              
851             # 3.3.2.3.1 Properties Length (variable length int)
852 0           my $prop_len = _decode_var_int($packet, \$offs);
853 0           my $prop_end = $offs + $prop_len;
854              
855 0           my @subscr_ids;
856             my $prop_id;
857              
858 0           while ($offs < $prop_end) {
859              
860 0           $prop_id = unpack("C", substr($$packet, $offs, 1));
861 0           $offs += 1;
862              
863 0 0         if ($prop_id == MQTT_PAYLOAD_FORMAT_INDICATOR) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
864             # 3.3.2.3.2 Payload Format Indicator (byte)
865 0           $prop{'payload_format'} = unpack("C", substr($$packet, $offs, 1));
866 0           $offs += 1;
867             }
868             elsif ($prop_id == MQTT_MESSAGE_EXPIRY_INTERVAL) {
869             # 3.3.2.3.3 Message Expiry Interval (long int)
870 0           $prop{'message_expiry_interval'} = unpack("N", substr($$packet, $offs, 4));
871 0           $offs += 4;
872             }
873             elsif ($prop_id == MQTT_TOPIC_ALIAS) {
874             # 3.3.2.3.4 Topic Alias (short int)
875 0           my $alias = unpack("n", substr($$packet, $offs, 2));
876 0           $offs += 2;
877 0 0         if (length $topic) {
878 0           $fh->{topic_alias}->{$alias} = $topic;
879             }
880             else {
881 0           $prop{'topic'} = $fh->{topic_alias}->{$alias};
882             }
883             }
884             elsif ($prop_id == MQTT_RESPONSE_TOPIC) {
885             # 3.3.2.3.5 Response Topic (utf8 string)
886 0           my $resp_topic = unpack("n/a", substr($$packet, $offs));
887 0           $offs += 2 + length $resp_topic;
888 0           utf8::decode( $resp_topic );
889 0           $prop{'response_topic'} = $resp_topic;
890             }
891             elsif ($prop_id == MQTT_CORRELATION_DATA) {
892             # 3.3.2.3.6 Correlation Data (binary data)
893 0           $prop{'correlation_data'} = unpack("n/a", substr($$packet, $offs));
894 0           $offs += 2 + length $prop{'correlation_data'};
895             }
896             elsif ($prop_id == MQTT_USER_PROPERTY) {
897             # 3.3.2.3.7 User Property (utf8 string pair)
898 0           my ($key, $val) = unpack("n/a n/a", substr($$packet, $offs));
899 0           $offs += 4 + length($key) + length($val);
900 0           utf8::decode( $key );
901 0           utf8::decode( $val );
902 0           $prop{$key} = $val;
903             }
904             elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER) {
905             # 3.3.2.3.8 Subscription Identifier (variable int)
906 0           push @subscr_ids, _decode_var_int($packet, \$offs);
907             }
908             elsif ($prop_id == MQTT_CONTENT_TYPE) {
909             # 3.3.2.3.9 Content Type (utf8 string)
910 0           my $content_type = unpack("n/a", substr($$packet, $offs));
911 0           $offs += 2 + length $content_type;
912 0           utf8::decode( $content_type );
913 0           $prop{'content_type'} = $content_type;
914             }
915             else {
916             # Protocol error
917 0           log_warn "Received PUBLISH with unknown property $prop_id";
918 0           $self->disconnect($fh, reason_code => 0x81);
919 0           return;
920             }
921             }
922              
923             # Trim variable header from packet, the remaining is the payload
924 0           substr($$packet, 0, $prop_end, '');
925              
926 0 0         if ($prop{'payload_format'}) {
927             # Payload is UTF-8 Encoded Character Data
928 0           utf8::decode( $$packet );
929             }
930              
931 0 0         if ($prop{'qos'} == 1) {
932             # Acknowledge received message
933 0           $self->puback( $fh, packet_id => $prop{'packet_id'} );
934 0           delete $prop{'packet_id'};
935             }
936              
937 0           $prop{'payload'} = $packet;
938              
939 0           $self->incoming_message($fh, \%prop);
940             }
941              
942             sub publish {
943 0     0 0   my ($self, $fh, %args) = @_;
944              
945 0           my $topic = delete $args{'topic'};
946 0           my $payload = delete $args{'payload'};
947 0           my $qos = delete $args{'qos'};
948 0           my $dup = delete $args{'duplicate'};
949 0           my $retain = delete $args{'retain'};
950 0           my $packet_id = delete $args{'packet_id'};
951 0           my $on_puback = delete $args{'on_puback'};
952              
953 0 0         croak "Message topic was not specified" unless defined $topic;
954              
955 0 0         $payload = '' unless defined $payload;
956 0 0         my $payload_ref = (ref $payload eq 'SCALAR') ? $payload : \$payload;
957              
958             #TODO: 3.3.2.3.4 Topic Alias
959 0           my $topic_alias;
960              
961             # 3.3.1.2 QoS level
962 0           my $flags = 0;
963 0 0         $flags |= $qos << 1 if $qos;
964 0 0         $flags |= 0x04 if $dup;
965 0 0         $flags |= 0x01 if $retain;
966              
967 0           my $raw_prop = '';
968              
969 0 0         if (utf8::is_utf8( $$payload_ref )) {
970             # 3.3.2.3.2 Payload Format Indicator (byte)
971 0           $raw_prop .= pack("C C", MQTT_PAYLOAD_FORMAT_INDICATOR, 0x01);
972 0           utf8::encode( $$payload_ref );
973             }
974              
975 0 0         if (exists $args{'message_expiry_interval'}) {
976             # 3.3.2.3.3 Message Expiry Interval (long int)
977 0           $raw_prop .= pack("C N", MQTT_MESSAGE_EXPIRY_INTERVAL, delete $args{'message_expiry_interval'});
978             }
979              
980 0 0         if ($topic_alias) {
981             # 3.3.2.3.4 Topic Alias (short int)
982 0           $raw_prop .= pack("C n", MQTT_TOPIC_ALIAS, $topic_alias);
983             }
984              
985 0 0         if (exists $args{'response_topic'}) {
986             # 3.3.2.3.5 Response Topic (utf8 string)
987 0           utf8::encode( $args{'response_topic'} );
988 0           $raw_prop .= pack("C n/a*", MQTT_RESPONSE_TOPIC, delete $args{'response_topic'});
989             }
990              
991 0 0         if (exists $args{'correlation_data'}) {
992             # 3.3.2.3.6 Correlation Data (binary data)
993 0           $raw_prop .= pack("C n/a*", MQTT_CORRELATION_DATA, delete $args{'correlation_data'});
994             }
995              
996 0 0         if (exists $args{'subscription_identifier'}) {
997             # 3.3.2.3.8 Subscription Identifier (variable int)
998             $raw_prop .= pack("C", MQTT_SUBSCRIPTION_IDENTIFIER) .
999 0           _encode_var_int( delete $args{'subscription_identifier'} );
1000             }
1001              
1002 0 0         if (exists $args{'content_type'}) {
1003             # 3.3.2.3.9 Content Type (utf8 string)
1004 0           utf8::encode( $args{'content_type'} );
1005 0           $raw_prop .= pack("C n/a*", MQTT_CONTENT_TYPE, delete $args{'content_type'});
1006             }
1007              
1008 0           foreach my $key (keys %args) {
1009             # 3.3.2.3.7 User Property (utf8 string pair)
1010 0           my $val = $args{$key};
1011 0 0         next unless defined $val;
1012 0           utf8::encode( $key );
1013 0           utf8::encode( $val );
1014 0           $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
1015             }
1016              
1017             # 3.3.2.1 Topic name (utf8 string)
1018 0           utf8::encode( $topic );
1019 0           my $raw_mqtt = pack("n/a*", $topic);
1020              
1021             # 3.3.2.2 Packet identifier (short int)
1022 0 0         $raw_mqtt .= pack("n", $packet_id) if $packet_id;
1023              
1024             # 3.3.2.3 Properties
1025 0           $raw_mqtt .= _encode_var_int(length $raw_prop);
1026 0           $raw_mqtt .= $raw_prop;
1027              
1028             # 3.3.3 Payload
1029 0           $raw_mqtt .= $$payload_ref;
1030              
1031 0           $raw_mqtt = pack("C", MQTT_PUBLISH << 4 | $flags) . # 3.3.1 Packet type
1032             _encode_var_int(length $raw_mqtt) . # 3.3.1 Packet length
1033             $raw_mqtt;
1034              
1035 0           $self->{_WORKER}->{notif_count}++; # track outgoing messages for stats
1036              
1037 0           $fh->push_write( $raw_mqtt );
1038             }
1039              
1040             sub puback {
1041 0     0 0   my ($self, $fh, %args) = @_;
1042              
1043 0 0         croak "Missing packet_id" unless $args{'packet_id'};
1044              
1045             my $raw_mqtt = pack(
1046             "C C n C",
1047             MQTT_PUBACK << 4, # 3.4.1 Packet type
1048             3, # 3.4.1 Remaining length
1049             $args{'packet_id'}, # 3.4.2 Packet identifier
1050 0   0       $args{'reason_code'} || 0, # 3.4.2.1 Reason code
1051             );
1052              
1053 0           $fh->push_write( $raw_mqtt );
1054             }
1055              
1056             sub _receive_puback {
1057 0     0     my ($self, $fh, $packet) = @_;
1058              
1059 0           my ($packet_id, $reason_code) = unpack("n C", $$packet);
1060 0 0         $reason_code = 0 unless defined $reason_code;
1061              
1062 0           $self->get_client($fh)->on_puback($packet_id);
1063             }
1064              
1065             sub _receive_pubrec {
1066 0     0     my ($self, $fh, $packet) = @_;
1067              
1068 0           $self->disconnect($fh, reason_code => 0x9B);
1069             }
1070              
1071             sub _receive_pubrel {
1072 0     0     my ($self, $fh, $packet) = @_;
1073              
1074 0           $self->disconnect($fh, reason_code => 0x9B);
1075             }
1076              
1077             sub _receive_pubcomp {
1078 0     0     my ($self, $fh, $packet) = @_;
1079              
1080 0           $self->disconnect($fh, reason_code => 0x9B);
1081             }
1082              
1083             sub _receive_pubauth {
1084 0     0     my ($self, $fh, $packet) = @_;
1085              
1086 0           $self->disconnect($fh, reason_code => 0x9B);
1087             }
1088              
1089              
1090             #------------------------------------------------------------------------------
1091              
1092             sub add_client {
1093 0     0 0   my ($self, $fh, $prop) = @_;
1094 0           weaken($self);
1095              
1096 0           my $client_id = $prop->{'client_identifier'};
1097 0           my $username = $prop->{'username'};
1098 0           my $password = $prop->{'password'};
1099              
1100 0           my $users_cfg = $self->{'users'};
1101 0           my $authorized;
1102              
1103             AUTH: {
1104              
1105 0 0         last unless (length $client_id);
  0            
1106 0 0         last unless (length $username);
1107 0 0         last unless (length $password);
1108              
1109 0 0         last unless ($users_cfg);
1110 0 0         last unless ($users_cfg->{$username});
1111 0 0         last unless ($users_cfg->{$username}->{'password'} eq $password);
1112              
1113 0           $authorized = 1;
1114             }
1115              
1116 0 0         unless ($authorized) {
1117 0           log_warn('Client not authorized');
1118 0           $self->_shutdown($fh);
1119 0           return;
1120             }
1121              
1122             my $client = Beekeeper::Service::ToyBroker::Client->new(
1123             client_id => $client_id,
1124 0     0     publish => sub { $self->publish($fh, @_) },
1125 0           );
1126              
1127 0           $self->{clients}->{"$fh"} = $client;
1128              
1129 0           $self->connack( $fh, maximum_qos => 1 );
1130             }
1131              
1132             sub get_client {
1133 0     0 0   my ($self, $fh) = @_;
1134              
1135 0           return $self->{clients}->{"$fh"};
1136             }
1137              
1138             sub remove_client {
1139 0     0 0   my ($self, $fh) = @_;
1140              
1141 0           my $client = $self->{clients}->{"$fh"};
1142              
1143 0 0         return unless $client; # called on eof after DISCONNECT
1144              
1145 0           foreach my $topic_filter (keys %{$client->{subscriptions}}) {
  0            
1146              
1147 0           $self->unsubscribe_client($fh, { topic_filter => $topic_filter });
1148             }
1149              
1150 0           $client->resend_unacked_messages;
1151              
1152 0           delete $self->{clients}->{"$fh"};
1153             }
1154              
1155             sub incoming_message {
1156 0     0 0   my ($self, $fh, $message) = @_;
1157              
1158 0           my @topics = values %{$self->{topics}};
  0            
1159              
1160 0           foreach my $topic (@topics) {
1161              
1162 0 0         next unless $message->{'topic'} =~ $topic->{topic_regex};
1163              
1164 0           foreach my $subscription (values %{$topic->{subscriptions}}) {
  0            
1165              
1166 0           $subscription->send_message( $message );
1167             }
1168             }
1169             }
1170              
1171             sub _validate_filter {
1172 0     0     my ($self, $topic_filter) = @_;
1173              
1174 0 0         return unless defined $topic_filter;
1175              
1176 0           $topic_filter =~ s|^\$share/([-\w]+)/||;
1177              
1178 0           my $shared_group = $1;
1179              
1180 0 0         return unless $topic_filter =~ m/^ (( [-\w]+ | \+ ) \/)* ( [-\w]+ | \+ | \# ) $/x;
1181              
1182 0           return ($topic_filter, $shared_group);
1183             }
1184              
1185             sub subscribe_client {
1186 0     0 0   my ($self, $fh, $prop) = @_;
1187              
1188 0           my ($topic_filter, $shared_group) = $self->_validate_filter( $prop->{'topic_filter'} );
1189              
1190 0 0         return 0x8F unless defined $topic_filter; # "Topic Filter invalid"
1191              
1192             #TODO: Access permissions
1193              
1194 0           my $topic = $self->{topics}->{$topic_filter};
1195              
1196 0 0         unless ($topic) {
1197 0           $topic = Beekeeper::Service::ToyBroker::TopicFilter->new( $topic_filter );
1198 0           $self->{topics}->{$topic_filter} = $topic;
1199             }
1200              
1201 0           my $client = $self->{clients}->{"$fh"};
1202              
1203 0 0         my $granted_qos = $prop->{'maximum_qos'} ? 1 : 0;
1204              
1205             my $subscription = Beekeeper::Service::ToyBroker::Subscription->new(
1206             id => $prop->{'subscription_identifier'},
1207 0           no_local => $prop->{'no_local'},
1208             max_qos => $granted_qos,
1209             client => $client,
1210             );
1211              
1212 0 0         if ($shared_group) {
1213 0           $topic->add_shared_subscription( $subscription, $shared_group );
1214             }
1215             else {
1216 0           $topic->add_subscription( $subscription, $client->client_id );
1217             }
1218              
1219 0           $client->{subscriptions}->{$prop->{topic_filter}} = 1;
1220              
1221 0           return $granted_qos;
1222             }
1223              
1224             sub unsubscribe_client {
1225 0     0 0   my ($self, $fh, $prop) = @_;
1226              
1227 0           my ($topic_filter, $shared_group) = $self->_validate_filter( $prop->{'topic_filter'} );
1228              
1229 0 0         return 0x8F unless defined $topic_filter; # "Topic Filter invalid"
1230              
1231 0           my $topic = $self->{topics}->{$topic_filter};
1232              
1233 0 0         return 0x11 unless defined $topic; # "No subscription existed"
1234              
1235 0           my $client = $self->{clients}->{"$fh"};
1236 0           my $client_id = $client->client_id;
1237 0           my $success;
1238              
1239 0 0         if ($shared_group) {
1240 0           $success = $topic->remove_shared_subscription( $client_id, $shared_group );
1241             }
1242             else {
1243 0           $success = $topic->remove_subscription( $client_id );
1244             }
1245              
1246 0 0         delete $self->{topics}->{$topic_filter} unless $topic->has_subscriptions;
1247              
1248 0           delete $client->{subscriptions}->{$prop->{topic_filter}};
1249              
1250 0 0         return $success ? 0x00 : 0x11;
1251             }
1252              
1253              
1254             package
1255             Beekeeper::Service::ToyBroker::Client; # hide from PAUSE
1256              
1257             sub new {
1258 0     0     my ($class, %args) = @_;
1259              
1260             my $self = {
1261             client_id => $args{'client_id'},
1262 0           publish => $args{'publish'},
1263             subscriptions => {},
1264             pending_ack => {},
1265             packet_seq => 1,
1266             };
1267              
1268 0           bless $self, $class;
1269             }
1270              
1271             sub client_id {
1272 0     0     my ($self) = @_;
1273              
1274 0           $self->{client_id};
1275             }
1276              
1277             sub publish {
1278 0     0     my ($self, $message, $sender) = @_;
1279              
1280 0           my $packet_id;
1281              
1282 0 0         if ($message->{'qos'}) {
1283              
1284 0           $packet_id = $self->{packet_seq}++;
1285 0 0         $self->{packet_seq} = 1 if ($packet_id == 0xFFFF);
1286              
1287 0           $self->{pending_ack}->{$packet_id} = [ $message, $sender ];
1288             }
1289              
1290 0 0         local $message->{'packet_id'} = $packet_id if $packet_id;
1291              
1292 0           $self->{publish}->( %$message );
1293             }
1294              
1295             sub on_puback {
1296 0     0     my ($self, $packet_id) = @_;
1297              
1298 0           delete $self->{pending_ack}->{$packet_id};
1299             }
1300              
1301             sub resend_unacked_messages {
1302 0     0     my ($self) = @_;
1303              
1304 0           my $pending_ack = $self->{pending_ack};
1305              
1306 0           foreach my $packet_id (keys %$pending_ack) {
1307              
1308 0           my $unacked = delete $pending_ack->{$packet_id};
1309              
1310 0           my ($message, $sender) = @$unacked;
1311              
1312 0 0         next unless $sender->has_subscriptions;
1313              
1314 0           $message->{'duplicate'} = 1;
1315              
1316 0           $sender->send_message( $message );
1317             }
1318             }
1319              
1320              
1321             package
1322             Beekeeper::Service::ToyBroker::TopicFilter; # hide from PAUSE
1323              
1324             sub new {
1325 0     0     my ($class, $topic_filter) = @_;
1326              
1327 0           my $topic_regex = $topic_filter;
1328 0           $topic_regex =~ s/\+/[^\/]+/g;
1329 0           $topic_regex =~ s/\#/.+/g;
1330              
1331 0           my $self = {
1332             subscriptions => {},
1333             topic_filter => $topic_filter,
1334             topic_regex => qr/^${topic_regex}$/,
1335             };
1336              
1337 0           bless $self, $class;
1338             }
1339              
1340             sub has_subscriptions {
1341 0     0     my ($self) = @_;
1342              
1343 0 0         return (scalar keys %{$self->{subscriptions}}) ? 1 : 0;
  0            
1344             }
1345              
1346             sub add_subscription {
1347 0     0     my ($self, $subscription, $client_id) = @_;
1348              
1349 0           $self->{subscriptions}->{"client/$client_id"} = $subscription;
1350             }
1351              
1352             sub add_shared_subscription {
1353 0     0     my ($self, $subscription, $shared_group) = @_;
1354              
1355 0           my $shared = $self->{subscriptions}->{"shared/$shared_group"};
1356              
1357 0 0         unless ($shared) {
1358 0           $shared = Beekeeper::Service::ToyBroker::SharedSubscription->new;
1359 0           $self->{subscriptions}->{"shared/$shared_group"} = $shared;
1360             }
1361              
1362 0           $shared->add_subscription( $subscription );
1363             }
1364              
1365             sub remove_subscription {
1366 0     0     my ($self, $client_id) = @_;
1367              
1368 0           my $existed = delete $self->{subscriptions}->{"client/$client_id"};
1369              
1370 0 0         return $existed ? 1 : 0;
1371             }
1372              
1373             sub remove_shared_subscription {
1374 0     0     my ($self, $client_id, $shared_group) = @_;
1375              
1376 0           my $shared = $self->{subscriptions}->{"shared/$shared_group"};
1377              
1378 0 0         return 0 unless $shared;
1379              
1380 0           my $success = $shared->remove_subscription( $client_id );
1381              
1382 0 0         delete $self->{subscriptions}->{"shared/$shared_group"} unless $shared->has_subscriptions;
1383              
1384 0           return $success;
1385             }
1386              
1387              
1388             package
1389             Beekeeper::Service::ToyBroker::Subscription; # hide from PAUSE
1390              
1391             sub new {
1392 0     0     my $class = shift;
1393              
1394 0           my $self = {
1395             id => undef,
1396             max_qos => undef,
1397             no_local => undef,
1398             client => undef,
1399             @_
1400             };
1401              
1402 0           bless $self, $class;
1403             }
1404              
1405             sub client {
1406 0     0     my ($self) = @_;
1407              
1408 0           $self->{client};
1409             }
1410              
1411             sub has_subscriptions {
1412 0     0     my ($self) = @_;
1413              
1414 0 0         return (scalar keys %{$self->{subscriptions}}) ? 1 : 0;
  0            
1415             }
1416              
1417             sub send_message {
1418 0     0     my ($self, $message, $sender) = @_;
1419              
1420 0 0         local $message->{'subscription_identifier'} = $self->{id} if $self->{id};
1421              
1422 0 0         local $message->{'qos'} = 0 if ($self->{max_qos} == 0);
1423              
1424 0   0       $self->{client}->publish( $message, $sender || $self );
1425             }
1426              
1427              
1428             package
1429             Beekeeper::Service::ToyBroker::SharedSubscription; # hide from PAUSE
1430              
1431             sub new {
1432 0     0     my ($class, %args) = @_;
1433              
1434 0           my $self = {
1435             subscriptions => {},
1436             subscr_keys => [],
1437             };
1438              
1439 0           bless $self, $class;
1440             }
1441              
1442             sub add_subscription {
1443 0     0     my ($self, $subscription) = @_;
1444              
1445 0           my $client_id = $subscription->client->client_id;
1446              
1447 0           push @{$self->{subscr_keys}}, "client/$client_id";
  0            
1448              
1449 0           $self->{subscriptions}->{"client/$client_id"} = $subscription;
1450             }
1451              
1452             sub remove_subscription {
1453 0     0     my ($self, $client_id) = @_;
1454              
1455 0           my $subscr_keys = $self->{subscr_keys};
1456 0           @$subscr_keys = grep { $_ ne "client/$client_id" } @$subscr_keys;
  0            
1457              
1458 0           my $existed = delete $self->{subscriptions}->{"client/$client_id"};
1459              
1460 0 0         return $existed ? 1 : 0;
1461             }
1462              
1463             sub has_subscriptions {
1464 0     0     my ($self) = @_;
1465              
1466 0 0         return (scalar keys %{$self->{subscriptions}}) ? 1 : 0;
  0            
1467             }
1468              
1469             sub send_message {
1470 0     0     my ($self, $message) = @_;
1471              
1472             # Round robin
1473 0           my $subscr_keys = $self->{subscr_keys};
1474 0           my $next = shift @$subscr_keys;
1475 0           push @$subscr_keys, $next;
1476              
1477 0           my $subscription = $self->{subscriptions}->{$next};
1478              
1479             #TODO: Prefer idle subscriptions
1480              
1481 0           $subscription->send_message( $message, $self );
1482             }
1483              
1484             1;
1485              
1486             __END__