File Coverage

blib/lib/Net/MQTT/Simple.pm
Criterion Covered Total %
statement 24 217 11.0
branch 7 132 5.3
condition 0 54 0.0
subroutine 6 32 18.7
pod 11 11 100.0
total 48 446 10.7


line stmt bran cond sub pod time code
1             package Net::MQTT::Simple;
2              
3 2     2   57577 use strict;
  2         4  
  2         55  
4 2     2   9 use warnings;
  2         4  
  2         362  
5              
6             our $VERSION = '1.27';
7              
8             # Please note that these are not documented and are subject to change:
9             our $KEEPALIVE_INTERVAL = 60;
10             our $PING_TIMEOUT = 10;
11             our $RECONNECT_INTERVAL = 5;
12             our $MAX_LENGTH = 2097152; # 2 MB
13             our $READ_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL recommendation
14             our $WRITE_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL maximum
15             our $PROTOCOL_LEVEL = 0x04; # 0x03 in v3.1, 0x04 in v3.1.1
16             our $PROTOCOL_NAME = "MQTT"; # MQIsdp in v3.1, MQTT in v3.1.1
17              
18             my $global;
19              
20             BEGIN {
21             *_socket_class =
22 2     0   1097 eval { require IO::Socket::IP; 1 } ? sub { "IO::Socket::IP" }
  2         64962  
  0         0  
23 0         0 : eval { require IO::Socket::INET; 1 } ? sub { "IO::Socket::INET" }
  0         0  
  0         0  
24 2 0   2   7 : die "Neither IO::Socket::IP nor IO::Socket::INET found";
    50          
25             }
26              
27 0     0   0 sub _default_port { 1883 }
28 0     0   0 sub _socket_error { "$@" }
29 0     0   0 sub _secure { 0 }
30              
31              
32 0     0   0 sub _client_identifier { my ($class) = @_; return "Net::MQTT::Simple[" . $class->{random_id} . "]"; }
  0         0  
33              
34             # Carp might not be available either.
35             sub _croak {
36 0     0   0 die sprintf "%s at %s line %d.\n", "@_", (caller 1)[1, 2];
37             }
38              
39             sub filter_as_regex {
40 555     555 1 255750 my ($filter) = @_;
41              
42 555 100       1345 return "^(?!\\\$)" if $filter eq '#'; # Match everything except /^\$/
43 535 100       856 return "^/" if $filter eq '/#'; # Parent (empty topic) is invalid
44              
45 518         853 $filter = quotemeta $filter;
46              
47 518         2168 $filter =~ s{ \z (?
48 518         1066 $filter =~ s{ \\ \/ \\ \# } {}x;
49 518         1029 $filter =~ s{ \\ \+ } {[^/]*+}xg;
50 518         883 $filter =~ s{ ^ (?= \[ \^ / \] \* ) } {(?!\\\$)}x; # No /^\$/ if /^\+/
51              
52 518         1248 return "^$filter";
53             }
54              
55             sub import {
56 2     2   31 my ($class, $server) = @_;
57 2 50       11 @_ <= 2 or _croak "Too many arguments for use " . __PACKAGE__;
58              
59 2 50       52 $server or return;
60              
61 0           $global = $class->new($server);
62              
63 2     2   18 no strict 'refs';
  2         4  
  2         4787  
64 0           *{ (caller)[0] . "::publish" } = \&publish;
  0            
65 0           *{ (caller)[0] . "::retain" } = \&retain;
  0            
66             }
67              
68             sub new {
69 0     0 1   my ($class, $server, $sockopts) = @_;
70 0 0 0       @_ == 2 or @_ == 3 or _croak "Wrong number of arguments for $class->new";
71              
72 0           my $port = $class->_default_port;
73              
74             # Add port for bare IPv6 address
75 0 0 0       $server = "[$server]:$port" if $server =~ /:.*:/ and not $server =~ /\[/;
76              
77             # Add port for bare IPv4 address or bracketed IPv6 address
78 0 0 0       $server .= ":$port" if $server !~ /:/ or $server =~ /^\[.*\]$/;
79              
80             # Create a random ID for the instance of the object
81 0           my $random_id = join "", map chr 65 + int rand 26, 1 .. 10;
82            
83 0   0       return bless {
84             server => $server,
85             last_connect => 0,
86             sockopts => $sockopts // {},
87             random_id => $random_id
88             }, $class;
89             }
90              
91             sub last_will {
92 0     0 1   my ($self, $topic, $message, $retain) = @_;
93              
94 0           my %old;
95 0 0         %old = %{ $self->{will} } if $self->{will};
  0            
96              
97 0 0         _croak "Wrong number of arguments for last_will" if @_ > 4;
98              
99 0 0         if (@_ >= 2) {
100 0 0 0       if (not defined $topic and not defined $message) {
101 0           delete $self->{will};
102 0           delete $self->{encoded_will};
103              
104 0           return;
105             } else {
106             $self->{will} = {
107             topic => $topic // $old{topic} // '',
108             message => $message // $old{message} // '',
109 0   0       retain => !!$retain // $old{retain} // 0,
      0        
      0        
      0        
      0        
      0        
110             };
111 0 0         _croak("Topic is empty") if not length $self->{will}->{topic};
112              
113 0           my $e = $self->{encoded_will} = { %{ $self->{will} } };
  0            
114 0           utf8::encode($e->{topic});
115 0 0         utf8::downgrade($e->{message}, 1) or do {
116 0           my ($file, $line, $method) = (caller 1)[1, 2, 3];
117 0           warn "Wide character in $method at $file line $line.\n";
118 0           utf8::encode($e->{message});
119             };
120             }
121             }
122              
123 0           return @{ $self->{will} }{qw/topic message retain/};
  0            
124             }
125              
126             sub login {
127 0     0 1   my ($self, $username, $password) = @_;
128              
129              
130 0 0         if (@_ > 1) {
131             _croak "Password login is disabled for insecure connections"
132             if defined $password
133 0 0 0       and not $self->_secure || $ENV{MQTT_SIMPLE_ALLOW_INSECURE_LOGIN};
      0        
134              
135 0           utf8::encode($username);
136 0           $self->{username} = $username;
137 0           $self->{password} = $password;
138             }
139              
140 0           return $username;
141             }
142              
143             sub _connect {
144 0     0     my ($self) = @_;
145              
146 0 0 0       return if $self->{socket} and $self->{socket}->connected;
147              
148 0 0         if ($self->{last_connect} > time() - $RECONNECT_INTERVAL) {
149 0           select undef, undef, undef, .01;
150 0           return;
151             }
152              
153             # Reset state
154 0           $self->{last_connect} = time;
155 0           $self->{buffer} = "";
156 0           delete $self->{ping};
157              
158             # Connect
159 0           my $socket_class = $self->_socket_class;
160             my %socket_options = (
161             PeerAddr => $self->{server},
162 0           %{ $self->{sockopts} }
  0            
163             );
164 0 0         $self->{socket} = $socket_class->new( %socket_options )
165             or warn "$0: connect: " . $self->_socket_error . "\n";
166              
167             # Say hello
168 0           local $self->{skip_connect} = 1; # avoid infinite recursion :-)
169 0           $self->_send_connect;
170 0           $self->_send_subscribe;
171             }
172              
173             sub _prepend_variable_length {
174             # Copied from Net::MQTT::Constants
175 0     0     my ($data) = @_;
176 0           my $v = length $data;
177 0           my $o = "";
178 0           my $d;
179 0           do {
180 0           $d = $v % 128;
181 0           $v = int($v/128);
182 0 0         $d |= 0x80 if $v;
183 0           $o .= pack "C", $d;
184             } while $d & 0x80;
185 0           return "$o$data";
186             }
187              
188             sub _send {
189 0     0     my ($self, $data) = @_;
190              
191 0 0         $self->_connect unless exists $self->{skip_connect};
192 0           delete $self->{skip_connect};
193              
194 0 0         my $socket = $self->{socket} or return;
195              
196 0           while (my $chunk = substr $data, 0, $WRITE_BYTES, "") {
197 0 0         syswrite $socket, $chunk
198             or $self->_drop_connection; # reconnect on next message
199             }
200              
201 0           $self->{last_send} = time;
202             }
203              
204             sub _send_connect {
205 0     0     my ($self) = @_;
206              
207 0           my $will = $self->{encoded_will};
208 0           my $flags = 0x02;
209 0 0         $flags |= 0x04 if $will;
210 0 0 0       $flags |= 0x20 if $will and $will->{retain};
211              
212 0 0         $flags |= 0x80 if defined $self->{username};
213 0 0 0       $flags |= 0x40 if defined $self->{username} and defined $self->{password};
214              
215             $self->_send("\x10" . _prepend_variable_length(pack(
216             "x C/a* C C n n/a*"
217             . ($flags & 0x04 ? "n/a* n/a*" : "")
218             . ($flags & 0x80 ? "n/a*" : "")
219             . ($flags & 0x40 ? "n/a*" : ""),
220             $PROTOCOL_NAME,
221             $PROTOCOL_LEVEL,
222             $flags,
223             $KEEPALIVE_INTERVAL,
224             $self->_client_identifier,
225             ($flags & 0x04 ? ($will->{topic}, $will->{message}) : ()),
226             ($flags & 0x80 ? $self->{username} : ()),
227 0 0         ($flags & 0x40 ? $self->{password} : ()),
    0          
    0          
    0          
    0          
    0          
228             )));
229             }
230              
231             sub _send_subscribe {
232 0     0     my ($self, @topics) = @_;
233              
234 0 0         if (not @topics) {
235 0 0         @topics = keys %{ $self->{sub} } or return;
  0            
236             }
237 0 0         return if not @topics;
238              
239 0           utf8::encode($_) for @topics;
240              
241             # Hardcoded "packet identifier" \0\x01 for now (was \0\0 but the spec
242             # disallows it for subscribe packets and mosquitto started enforcing that.)
243 0           $self->_send("\x82" . _prepend_variable_length("\0\x01" .
244             pack("(n/a* x)*", @topics) # x = QoS 0
245             ));
246             }
247              
248             sub _send_unsubscribe {
249 0     0     my ($self, @topics) = @_;
250              
251 0 0         return if not @topics;
252              
253 0           utf8::encode($_) for @topics;
254              
255             # Hardcoded "packet identifier" \0\0x01 for now; see above.
256 0           $self->_send("\xa2" . _prepend_variable_length("\0\x01" .
257             pack("(n/a*)*", @topics)
258             ));
259             }
260              
261             sub _parse {
262 0     0     my ($self) = @_;
263              
264 0           my $bufref = \$self->{buffer};
265              
266 0 0         return if length $$bufref < 2;
267              
268 0           my $offset = 1;
269              
270 0           my $length = do {
271 0           my $multiplier = 1;
272 0           my $v = 0;
273 0           my $d;
274 0           do {
275 0 0         return if $offset >= length $$bufref; # not enough data yet
276 0           $d = unpack "C", substr $$bufref, $offset++, 1;
277 0           $v += ($d & 0x7f) * $multiplier;
278 0           $multiplier *= 128;
279             } while ($d & 0x80);
280 0           $v;
281             };
282              
283 0 0         if ($length > $MAX_LENGTH) {
284             # On receiving an enormous packet, just disconnect to avoid exhausting
285             # RAM on tiny systems.
286             # TODO: just slurp and drop the data
287 0           $self->_drop_connection;
288 0           return;
289             }
290              
291 0 0         return if length($$bufref) < $offset + $length; # not enough data yet
292              
293 0           my $first_byte = unpack "C", substr $$bufref, 0, 1;
294              
295 0           my $packet = {
296             type => ($first_byte & 0xF0) >> 4,
297             dup => ($first_byte & 0x08) >> 3,
298             qos => ($first_byte & 0x06) >> 1,
299             retain => ($first_byte & 0x01),
300             data => substr($$bufref, $offset, $length),
301             };
302              
303 0           substr $$bufref, 0, $offset + $length, ""; # remove the parsed bits.
304              
305 0           return $packet;
306             }
307              
308             sub _incoming_publish {
309 0     0     my ($self, $packet) = @_;
310              
311             # Because QoS is not supported, no packed ID in the data. It would
312             # have been 16 bits between $topic and $message.
313 0           my ($topic, $message) = unpack "n/a a*", $packet->{data};
314              
315 0           utf8::decode($topic);
316              
317 0           for my $cb (@{ $self->{callbacks} }) {
  0            
318 0 0         if ($topic =~ /$cb->{regex}/) {
319 0           $cb->{callback}->($topic, $message, $packet->{retain});
320 0           return;
321             }
322             }
323             }
324              
325             sub _publish {
326 0     0     my ($self, $retain, $topic, $message) = @_;
327              
328 0 0 0       $message //= "" if $retain;
329              
330 0           utf8::encode($topic);
331 0 0         utf8::downgrade($message, 1) or do {
332 0           my ($file, $line, $method) = (caller 1)[1, 2, 3];
333 0           warn "Wide character in $method at $file line $line.\n";
334 0           utf8::encode($message);
335             };
336              
337 0 0         $self->_send(
338             ($retain ? "\x31" : "\x30")
339             . _prepend_variable_length(
340             pack("n/a*", $topic) . $message
341             )
342             );
343             }
344              
345             sub publish {
346 0     0 1   my $method = UNIVERSAL::isa($_[0], __PACKAGE__);
347 0 0         @_ == ($method ? 3 : 2) or _croak "Wrong number of arguments for publish";
    0          
348              
349 0 0         ($method ? shift : $global)->_publish(0, @_);
350             }
351              
352             sub retain {
353 0     0 1   my $method = UNIVERSAL::isa($_[0], __PACKAGE__);
354 0 0         @_ == ($method ? 3 : 2) or _croak "Wrong number of arguments for retain";
    0          
355              
356 0 0         ($method ? shift : $global)->_publish(1, @_);
357             }
358              
359             sub run {
360 0     0 1   my ($self, @subscribe_args) = @_;
361              
362 0 0         $self->subscribe(@subscribe_args) if @subscribe_args;
363              
364 0           until ($self->{stop_loop}) {
365 0           my @timeouts;
366             push @timeouts, $KEEPALIVE_INTERVAL - (time() - $self->{last_send})
367 0 0         if exists $self->{last_send};
368             push @timeouts, $PING_TIMEOUT - (time() - $self->{ping})
369 0 0         if exists $self->{ping};
370              
371             my $timeout = @timeouts
372 0 0         ? (sort { $a <=> $b } @timeouts)[0]
  0            
373             : 1; # default to 1
374              
375 0           $self->tick($timeout);
376             }
377              
378 0           delete $self->{stop_loop};
379             }
380              
381             sub subscribe {
382 0     0 1   my ($self, @kv) = @_;
383              
384 0           while (my ($topic, $callback) = splice @kv, 0, 2) {
385 0           $self->{sub}->{ $topic } = 1;
386 0           push @{ $self->{callbacks} }, {
  0            
387             topic => $topic,
388             regex => filter_as_regex($topic),
389             callback => $callback,
390             };
391             }
392              
393 0 0         $self->_send_subscribe() if $self->{socket};
394             }
395              
396             sub unsubscribe {
397 0     0 1   my ($self, @topics) = @_;
398              
399 0           $self->_send_unsubscribe(@topics);
400              
401 0           my $cb = $self->{callbacks};
402 0           for my $topic ( @topics ) {
403 0           @$cb = grep {$_->{topic} ne $topic} @$cb;
  0            
404             }
405              
406 0           delete @{ $self->{sub} }{ @topics };
  0            
407             }
408              
409             sub tick {
410 0     0 1   my ($self, $timeout) = @_;
411              
412 0           $self->_connect;
413              
414 0 0         my $socket = $self->{socket} or return;
415 0           my $bufref = \$self->{buffer};
416              
417 0           my $r = '';
418 0           vec($r, fileno($socket), 1) = 1;
419              
420 0 0 0       if (select $r, undef, undef, $timeout // 0) {
421             sysread $socket, $$bufref, $READ_BYTES, length $$bufref
422 0 0         or delete $self->{socket};
423              
424 0           while (length $$bufref) {
425 0 0         my $packet = $self->_parse() or last;
426 0 0         $self->_incoming_publish($packet) if $packet->{type} == 3;
427 0 0         delete $self->{ping} if $packet->{type} == 13;
428             }
429             }
430              
431 0 0         if (time() >= $self->{last_send} + $KEEPALIVE_INTERVAL) {
432 0           $self->_send("\xc0\0"); # PINGREQ
433 0           $self->{ping} = time;
434             }
435 0 0 0       if ($self->{ping} and time() >= $self->{ping} + $PING_TIMEOUT) {
436 0           $self->_drop_connection;
437             }
438              
439 0           return !! $self->{socket};
440             }
441              
442             sub disconnect {
443 0     0 1   my ($self) = @_;
444              
445             $self->_send(pack "C x", 0xe0)
446 0 0 0       if $self->{socket} and $self->{socket}->connected;
447              
448 0           $self->_drop_connection;
449             }
450              
451             sub _drop_connection {
452 0     0     my ($self) = @_;
453              
454 0           delete $self->{socket};
455 0           $self->{last_connect} = 0;
456             }
457              
458             1;
459              
460             __END__