File Coverage

blib/lib/Device/Serial/SLuRM.pm
Criterion Covered Total %
statement 177 186 95.1
branch 48 70 68.5
condition 12 21 57.1
subroutine 25 27 92.5
pod 7 8 87.5
total 269 312 86.2


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2022 -- leonerd@leonerd.org.uk
5              
6 6     6   582833 use v5.26;
  6         39  
7 6     6   3210 use Object::Pad 0.70 ':experimental(init_expr adjust_params)';
  6         53376  
  6         31  
8              
9             package Device::Serial::SLuRM 0.03;
10             class Device::Serial::SLuRM;
11              
12 6     6   2569 use Carp;
  6         13  
  6         401  
13              
14 6     6   3060 use Syntax::Keyword::Match;
  6         4401  
  6         34  
15              
16 6     6   850 use Future::AsyncAwait;
  6         15838  
  6         42  
17 6     6   2997 use Future::Buffer 0.03;
  6         11535  
  6         207  
18 6     6   518 use Future::IO;
  6         12536  
  6         225  
19              
20 6     6   2814 use Digest::CRC qw( crc8 );
  6         16122  
  6         544  
21              
22 6   50 6   50 use constant DEBUG => $ENV{SLURM_DEBUG} // 0;
  6         13  
  6         608  
23              
24             use constant {
25 6         35910 SLURM_PKTCTRL_META => 0x00,
26             SLURM_PKTCTRL_META_RESET => 0x01,
27             SLURM_PKTCTRL_META_RESETACK => 0x02,
28              
29             SLURM_PKTCTRL_NOTIFY => 0x10,
30              
31             SLURM_PKTCTRL_REQUEST => 0x30,
32              
33             SLURM_PKTCTRL_RESPONSE => 0xB0,
34             SLURM_PKTCTRL_ACK => 0xC0,
35             SLURM_PKTCTRL_ERR => 0xE0,
36 6     6   44 };
  6         14  
37              
38             =encoding UTF-8
39              
40             =head1 NAME
41              
42             C - communicate the SLµRM protocol over a serial port
43              
44             =head1 SYNOPSIS
45              
46             use v5;36;
47             use Device::Serial::SLuRM;
48              
49             my $slurm = Device::Serial::SLuRM->new(
50             dev => "/dev/ttyUSB0",
51             baud => 19200,
52             );
53              
54             $slurm->run(
55             on_notify => sub ($payload) {
56             printf "NOTIFY: %v02X\n", $payload;
57             }
58             )->await;
59              
60             =head1 DESCRIPTION
61              
62             This module provides a L-based interface for communicating with
63             a peer device on a serial port (or similar device handle) which talks the
64             SLµRM messaging protocol. It supports sending and receiving of NOTIFY
65             packets, and sending of REQUEST packets that receive a RESPONSE.
66              
67             It currently does not support receiving REQUESTs, though this could be added
68             relatively easily.
69              
70             =head2 SLµRM
71              
72             SLµRM ("Serial Link Microcontroller Reliable Messaging") is a simple
73             bidirectional communication protocol for adding reliable message framing and
74             request/response semantics to byte-based data links (such as asynchronous
75             serial ports), which may themselves be somewhat unreliable. SLµRM can tolerate
76             bytes arriving corrupted or going missing altogether, or additional noise
77             bytes being received, while still maintaining a reliable bidirectional flow of
78             messages. There are two main kinds of message flows - NOTIFYs and REQUESTs. In
79             all cases, packet payloads can be of a variable length (including zero bytes),
80             and the protocol itself does not put semantic meaning on those bytes - they
81             are free for the application to use as required.
82              
83             A NOTIFY message is a simple notification from one peer to the other, that
84             does not yield a response.
85              
86             A REQUEST message carries typically some sort of command instruction, to which
87             the peer should respond with a RESPONSE or ERR packet. Replies to a REQUEST
88             message do not have to be sent sequentially.
89              
90             The F directory of this distribution contains more detailed protocol
91             documentation which may be useful for writing other implementations.
92              
93             The F directory of this distribution contains a reference
94             implementation in C for 8-bit microcontrollers, such as AVR ATtiny and ATmega
95             chips.
96              
97             =cut
98              
99             =head2 Metrics
100              
101             If L is available, this module additionally provides metrics
102             under the namespace prefix of C. The following metrics are provided:
103              
104             =over 4
105              
106             =item discards
107              
108             An unlabelled counter tracking the number of times a received packet is
109             discarded due to failing CRC check.
110              
111             =item packets
112              
113             A counter, labelled by direction and packet type, tracking the number of
114             packets sent and received of each type.
115              
116             =item retransmits
117              
118             An unlabelled counter tracking the number of times a (REQUEST) packet had to
119             be retransmitted after the initial one timed out.
120              
121             =item serial_bytes
122              
123             A counter, labelled by direction, tracking the number of bytes sent and
124             received directly over the serial port. The rate of this can be used to
125             calculate overall serial link utilisation.
126              
127             =item timeouts
128              
129             An unlabelled counter tracking the number of times a request transaction was
130             abandoned entirely due to a timeout. This does I count transactions that
131             eventually succeeded after intermediate timeouts and retransmissions.
132              
133             =back
134              
135             =cut
136              
137             # Metrics support is entirely optional
138             our $METRICS;
139             eval {
140             require Metrics::Any and Metrics::Any->VERSION( '0.05' ) and
141             Metrics::Any->import( '$METRICS', name_prefix => [ 'slurm' ] );
142             };
143              
144             my %PKTTYPE_NAME;
145              
146             if( defined $METRICS ) {
147             $METRICS->make_counter( discards =>
148             description => "Number of received packets discarded due to CRC check",
149             );
150              
151             $METRICS->make_counter( packets =>
152             description => "Number of SLµRM packets sent and received, by type",
153             labels => [qw( dir type )],
154             );
155              
156             $METRICS->make_distribution( request_success_attempts =>
157             description => "How many requests eventually succeeded after a given number of transmissions",
158             units => "",
159             buckets => [ 1 .. 3 ],
160             );
161             $METRICS->make_counter( retransmits =>
162             description => "Number of retransmits of packets",
163             );
164              
165             $METRICS->make_counter( serial_bytes =>
166             description => "Total number of bytes sent and received on the serial link",
167             labels => [qw( dir )],
168             );
169              
170             $METRICS->make_counter( timeouts =>
171             description => "Number of transactions that were abandoned due to eventual timeout",
172             );
173              
174             %PKTTYPE_NAME = map { __PACKAGE__->can( "SLURM_PKTCTRL_$_" )->() => $_ }
175             qw( META NOTIFY REQUEST RESPONSE ERR ACK );
176              
177             # Keep prometheus increase() happy by initialising all the counters to zero
178             $METRICS->inc_counter_by( discards => 0 );
179             foreach my $dir (qw( rx tx )) {
180             $METRICS->inc_counter_by( packets => 0, [ dir => $dir, type => $_ ] ) for values %PKTTYPE_NAME;
181             $METRICS->inc_counter_by( serial_bytes => 0, [ dir => $dir ] );
182             }
183             $METRICS->inc_counter_by( retransmits => 0 );
184             $METRICS->inc_counter_by( timeouts => 0 );
185             }
186              
187             =head1 PARAMETERS
188              
189             =head2 dev
190              
191             dev => PATH
192              
193             Path to the F device node representing the serial port used for this
194             communication. This will be opened via L and configured into the
195             appropriate mode and baud rate.
196              
197             =head2 baud
198              
199             baud => NUM
200              
201             Optional baud rate to set for communication when opening a device node.
202              
203             SLµRM does not specify a particular rate, but a default value of 115.2k will
204             apply if left unspecified.
205              
206             =head2 fh
207              
208             fh => IO
209              
210             An IO handle directly to the the serial port device to be used for reading and
211             writing. It will be assumed to be set up correctly; no further setup will be
212             performed.
213              
214             Either C or C are required.
215              
216             =head2 retransmit_delay
217              
218             retransmit_delay => NUM
219              
220             Optional delay in seconds to wait after a non-response of a REQUEST packet
221             before sending it again. A default of 50msec (0.05) will apply if not
222             specified.
223              
224             Applications that transfer large amounts of data over slow links, or for which
225             responding to a command may take a long time, should increase this value.
226              
227             =head2 retransmit_count
228              
229             retransmit_count => NUM
230              
231             Optional number of additional attempts to try sending REQUEST packets before
232             giving up entirely. A default of 2 will apply if not specified (thus each
233             C method will make up to 3 attempts).
234              
235             =cut
236              
237             field $_fh :param { undef };
238              
239             ADJUST :params (
240             :$dev = undef,
241             :$baud = undef,
242             ) {
243             if( defined $_fh ) {
244             # fine
245             }
246             elsif( defined $dev ) {
247             require IO::Termios;
248              
249             $baud //= 115200;
250              
251             $_fh = IO::Termios->open( $dev, "$baud,8,n,1" ) or
252             croak "Cannot open device $dev - $!";
253              
254             $_fh->cfmakeraw;
255             }
256             else {
257             croak "Require either a 'dev' or 'fh' parameter";
258             }
259             }
260              
261             field $_retransmit_delay :param { 0.05 };
262             field $_retransmit_count :param { 2 };
263              
264             field $_on_notify;
265              
266             field $_did_reset;
267              
268             field $_seqno_tx { 0 };
269 1     1   5320 field $_seqno_rx :reader(_seqno_rx); # :reader just for unit-test purposes
  1         7  
270              
271             =head1 METHODS
272              
273             =cut
274              
275             =head2 recv_packet
276              
277             ( $pktctrl, $payload ) = await $slurm->recv_packet;
278              
279             Waits for and returns the next packet to be received from the serial port.
280              
281             =cut
282              
283             field $_recv_buffer;
284              
285             field $_next_resetack_f;
286              
287             async method recv_packet
288 42         117 {
289             $_recv_buffer //= Future::Buffer->new(
290             fill => $METRICS
291             ? sub {
292             Future::IO->sysread( $_fh, 8192 )
293 0     0   0 ->on_done( sub { $METRICS->inc_counter_by( serial_bytes => length $_[0], [ dir => "rx" ] ) } );
  0         0  
294             }
295 35     35   1289 : sub { Future::IO->sysread( $_fh, 8192 ) },
296 42 50 66     163 );
297              
298             PACKET: {
299 42         147 await $_recv_buffer->read_until( qr/\x55/ );
  45         220  
300              
301 33         34112 my ( $pktctrl, $len ) = unpack "C C", my $pkt = await $_recv_buffer->read_exactly( 3 );
302              
303 33 100       2026 if( crc8( $pkt ) != 0 ) {
304             # Header checksum failed
305 1 50       79 $METRICS and
306             $METRICS->inc_counter( discards => );
307              
308 1 50       5 $_recv_buffer->unread( $pkt ) if $pkt =~ m/\x55/;
309 1         4 redo PACKET;
310             }
311              
312 32         1184 $pkt .= await $_recv_buffer->read_exactly( $len + 1 );
313              
314 32 100       3044 if( crc8( $pkt ) != 0 ) {
315             # Body checksum failed
316 2 50       62 $METRICS and
317             $METRICS->inc_counter( discards => );
318              
319 2 100       15 $_recv_buffer->unread( $pkt ) if $pkt =~ m/\x55/;
320 2         35 redo PACKET;
321             }
322              
323 30         923 my $payload = substr( $pkt, 3, $len );
324              
325 30         48 printf STDERR "SLuRM <-RX {%02X/%v02X}\n", $pktctrl, $payload
326             if DEBUG > 1;
327              
328             $METRICS and
329 30 50 0     68 $METRICS->inc_counter( packets => [ dir => "rx", type => $PKTTYPE_NAME{ $pktctrl & 0xF0 } // "UNKNOWN" ] );
330              
331 30         247 return $pktctrl, $payload;
332             }
333 42     42 1 24246 }
334              
335             field @_pending_slots; # [$seqno] = { payload, response_f }
336             field $_run_f;
337              
338             async method _run
339 12         49 {
340 12         20 while(1) {
341 31         8622 my ( $pktctrl, $payload ) = await $self->recv_packet;
342 19         1280 my $seqno = $pktctrl & 0x0F;
343 19         39 $pktctrl &= 0xF0;
344              
345 19 100       48 if( $pktctrl == SLURM_PKTCTRL_META ) {
346 4 50 66     28 if( $seqno == SLURM_PKTCTRL_META_RESET or
347             $seqno == SLURM_PKTCTRL_META_RESETACK ) {
348 4         15 ( $_seqno_rx ) = unpack "C", $payload;
349              
350 4 100       13 if( $seqno == SLURM_PKTCTRL_META_RESET ) {
351 1         5 await $self->send_packet( SLURM_PKTCTRL_META_RESETACK, pack "C", $_seqno_tx );
352             }
353             else {
354 3 50       12 $_next_resetack_f->done if $_next_resetack_f;
355             }
356             }
357             else {
358 0         0 warn sprintf "No idea what to do with pktctrl(meta) = %02X\n", $seqno;
359             }
360              
361 4         3124 next;
362             }
363              
364 15         26 my $is_dup;
365 15 100       39 if( !( $pktctrl & 0x80 ) ) {
366 10 50       22 if( defined $_seqno_rx ) {
367 10         20 my $seqdiff = $seqno - $_seqno_rx;
368 10 100       21 $seqdiff += 16 if $seqdiff < 0;
369 10   100     38 $is_dup = !$seqdiff || $seqdiff > 8; # suppress duplicates / backsteps
370             }
371              
372 10         18 $_seqno_rx = $seqno;
373             }
374              
375             match( $pktctrl : == ) {
376             case( SLURM_PKTCTRL_NOTIFY ) {
377 10 100       22 next if $is_dup;
378              
379 8         10 printf STDERR "SLuRM rx-NOTIFY(%d): %v02X\n", $seqno, $payload
380             if DEBUG;
381              
382 8 50       27 $_on_notify ? $_on_notify->( $payload )
383             : warn "Received NOTIFY packet with no handler\n";
384             }
385              
386             case( SLURM_PKTCTRL_RESPONSE ),
387             case( SLURM_PKTCTRL_ERR ) {
388 5         11 my $slot = $_pending_slots[$seqno];
389 5 50       14 unless( $slot ) {
390 0         0 warn "Received reply to unsent request seqno=$seqno\n";
391 0         0 next;
392             }
393              
394 5 100       11 if( $pktctrl == SLURM_PKTCTRL_RESPONSE ) {
395 4         23 printf STDERR "SLuRM rx-RESPONSE(%d): %v02X\n", $seqno, $payload
396             if DEBUG;
397              
398 4         34 $slot->{response_f}->done( $payload );
399             }
400             else {
401 1         7 printf STDERR "SLuRM rx-ERR(%d): %v02X\n", $seqno, $payload
402             if DEBUG;
403              
404 1 50       13 my $message = sprintf "Received ERR packet <%v02X%s>",
405             substr( $payload, 0, 3 ),
406             length $payload > 3 ? "..." : "";
407 1         13 $slot->{response_f}->fail( $message, slurm => $payload );
408             }
409 5         750 $slot->{retransmit_f}->cancel;
410              
411             $METRICS and
412 5 50       214 $METRICS->report_distribution( request_success_attempts => 1 + $_retransmit_count - $slot->{retransmit_count} );
413              
414 5         10 undef $_pending_slots[$seqno];
415              
416 5         10 printf STDERR "SLuRM tx-ACK(%d)\n", $seqno
417             if DEBUG;
418              
419 5         16 await $self->send_packet_twice( SLURM_PKTCTRL_ACK | $seqno, "" );
420             }
421             default {
422 0         0 die sprintf "TODO: Received unrecognised packet type=%02X\n", $pktctrl;
423             }
424             }
425 15 100 66     52 }
    50          
426 12     12   26 }
427              
428             # undocumented but useful for unit tests
429             method _start
430 15     15   324 {
431 15   66 0   69 $_run_f //= $self->_run->on_fail( sub { die "Device::Serial::SLuRM runloop failed: $_[0]" } );
  0         0  
432              
433 15         16153 return $_run_f;
434             }
435              
436             =head2 run
437              
438             $run_f = $slurm->run( %args );
439              
440             Starts the receiver run-loop, which can be used to wait for incoming NOTIFY
441             packets. This method returns a future, but the returned future will not
442             complete in normal circumstances. It will remain pending while the run-loop is
443             running. If an unrecoverable error happens (such as an IO error on the
444             underlying serial port device) then this future will fail.
445              
446             Takes the following named arguments:
447              
448             =over 4
449              
450             =item on_notify => CODE
451              
452             $on_notify->( $payload )
453              
454             Optional. Invoked on receipt of a NOTIFY packet.
455              
456             =back
457              
458             Will automatically L first if required.
459              
460             =cut
461              
462 5         11 async method run ( %args )
  5         14  
  5         7  
463 5         16 {
464 5         11 $_on_notify = $args{on_notify}; # TODO: save old, restore on exit?
465              
466 5 100       14 $_did_reset or
467             await $self->reset;
468              
469             await $self->_start
470 5     5   1035 ->on_cancel( sub { undef $_on_notify } );
  5         85  
471 5     5 1 17522 }
472              
473             =head2 stop
474              
475             $slurm->stop;
476              
477             Stops the receiver run-loop, if running, causing its future to be cancelled.
478              
479             It is not an error to call this method if the run loop is not running.
480              
481             =cut
482              
483             method stop
484 12     12 1 40790 {
485 12 50       41 return unless $_run_f;
486              
487 12 50       25 eval { $_run_f->cancel } or warn "Failed to ->cancel the runloop future - $@";
  12         64  
488 12         3199 undef $_run_f;
489             }
490              
491             =head2 send_packet
492              
493             await $slurm->send_packet( $pktctrl, $payload );
494              
495             Sends a packet to the serial port.
496              
497             =cut
498              
499 31         52 async method send_packet ( $pktctrl, $payload )
  31         52  
  31         49  
  31         41  
500 31         77 {
501 31         47 printf STDERR "SLuRM TX-> {%02X/%v02X}\n", $pktctrl, $payload
502             if DEBUG > 1;
503              
504 31         106 my $bytes = pack( "C C", $pktctrl, length $payload );
505 31         90 $bytes .= pack( "C", crc8( $bytes ) );
506              
507 31         997 $bytes .= $payload;
508 31         83 $bytes .= pack( "C", crc8( $bytes ) );
509              
510             $METRICS and
511 31 50 0     856 $METRICS->inc_counter( packets => [ dir => "tx", type => $PKTTYPE_NAME{ $pktctrl & 0xF0 } // "UNKNOWN" ] );
512 31 50       69 $METRICS and
513             $METRICS->inc_counter_by( serial_bytes => 1 + length $bytes, [ dir => "tx" ] );
514              
515 31         147 return await Future::IO->syswrite_exactly( $_fh, "\x55" . $bytes );
516 31     31 1 3428 }
517              
518 10         18 async method send_packet_twice ( $pktctrl, $payload )
  10         17  
  10         17  
  10         13  
519 10         30 {
520 10         32 await $self->send_packet( $pktctrl, $payload );
521             # TODO: Send again after a short delay
522 10         36946 await $self->send_packet( $pktctrl, $payload );
523 10     10 0 24 }
524              
525             =head2 reset
526              
527             $slurm->reset;
528              
529             Resets the transmitter sequence number and sends a META-RESET packet.
530              
531             It is not normally required to explicitly call this, as the first call to
532             L, L or L will do it if required.
533              
534             =cut
535              
536             async method reset
537 3         13 {
538 3         6 $_seqno_tx = 0;
539              
540 3         17 await $self->send_packet_twice( SLURM_PKTCTRL_META_RESET, pack "C", $_seqno_tx );
541 3         5605 $_did_reset = 1;
542              
543 3         15 $self->_start;
544              
545             # TODO: These might collide, do we need a Queue?
546 3         12 await $_next_resetack_f = $_run_f->new;
547 3         216 undef $_next_resetack_f;
548 3     3 1 3587 }
549              
550             =head2 send_notify
551              
552             await $slurm->send_notify( $payload )
553              
554             Sends a NOTIFY packet.
555              
556             Will automatically L first if required.
557              
558             =cut
559              
560 2         21 async method send_notify ( $payload )
  2         4  
  2         28  
561 2         8 {
562 2 50       10 $_did_reset or
563             await $self->reset;
564              
565 2         7 ( $_seqno_tx += 1 ) &= 0x0F;
566 2         4 my $pktctrl = SLURM_PKTCTRL_NOTIFY | $_seqno_tx;
567              
568 2         7 await $self->send_packet_twice( $pktctrl, $payload );
569 2     2 1 8319 }
570              
571             =head2 request
572              
573             $data_in = await $slurm->request( $data_out );
574              
575             Sends a REQUEST packet, and waits for a response to it.
576              
577             If the peer responds with an ERR packet, the returned future will fail with
578             an error message, the category of C, and the payload body of the ERR
579             packet in the message details:
580              
581             $f->fail( $message, slurm => $payload );
582              
583             If the peer does not respond at all and all retransmit attempts end in a
584             timeout, the returned future will fail the same way but with C as the
585             message details:
586              
587             $f->fail( $message, slurm => undef );
588              
589             Will automatically L first if required.
590              
591             =cut
592              
593 6         11 async method request ( $payload )
  6         10  
  6         10  
594 6         25 {
595 6 100       18 $_did_reset or
596             await $self->reset;
597              
598 6         88 ( $_seqno_tx += 1 ) &= 0x0F;
599 6         10 my $seqno = $_seqno_tx;
600              
601 6         10 printf STDERR "SLuRM tx-REQUEST(%d): %v02X\n", $seqno, $payload
602             if DEBUG;
603              
604 6 50       17 $_pending_slots[$seqno] and croak "TODO: Request seqno collision - pick a new one?";
605              
606 6         11 my $pktctrl = SLURM_PKTCTRL_REQUEST | $seqno;
607              
608 6         22 await $self->send_packet( $pktctrl, $payload );
609              
610 6         8874 $self->_start;
611              
612 6         19 $_pending_slots[$seqno] = {
613             payload => $payload,
614             response_f => my $f = $_run_f->new,
615             retransmit_count => $_retransmit_count,
616             };
617              
618 6         67 $self->_set_retransmit( $seqno );
619              
620 6         7053 return await $f;
621 6     6 1 6476 }
622              
623 9         15 method _set_retransmit ( $seqno )
  9         15  
  9         13  
624 9     9   31 {
625 9 50       30 my $slot = $_pending_slots[$seqno] or die "ARG expected $seqno request";
626              
627             $slot->{retransmit_f} = Future::IO->sleep( $_retransmit_delay )
628             ->on_done( sub {
629 4 100   4   2643 if( $slot->{retransmit_count}-- ) {
630 3         6 printf STDERR "SLuRM retransmit REQUEST(%d)\n", $seqno
631             if DEBUG;
632              
633 3         8 my $pktctrl = SLURM_PKTCTRL_REQUEST | $seqno;
634             $slot->{retransmit_f} = $self->send_packet( $pktctrl, $slot->{payload} )
635             ->on_fail( sub {
636 0         0 warn "Retransmit failed: @_";
637 0         0 $slot->{response_f}->fail( @_ );
638             } )
639             ->on_done( sub {
640 3         744 $self->_set_retransmit( $seqno );
641 3         9 } );
642              
643 3 50       4014 $METRICS and
644             $METRICS->inc_counter( retransmits => );
645             }
646             else {
647 1         2 printf STDERR "SLuRM timeout REQUEST(%d)\n", $seqno
648             if DEBUG;
649              
650 1         6 my $message = sprintf "Request timed out after %d attempts\n", 1 + $_retransmit_count;
651 1         6 $slot->{response_f}->fail( $message, slurm => undef );
652              
653 1 50       205 $METRICS and
654             $METRICS->inc_counter( timeouts => );
655              
656 1         4 undef $_pending_slots[$seqno];
657             }
658 9         34 });
659             }
660              
661             =head1 AUTHOR
662              
663             Paul Evans
664              
665             =cut
666              
667             0x55AA;