File Coverage

blib/lib/Device/Serial/SLuRM.pm
Criterion Covered Total %
statement 181 190 95.2
branch 49 72 68.0
condition 12 21 57.1
subroutine 26 28 92.8
pod 7 8 87.5
total 275 319 86.2


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2022 -- leonerd@leonerd.org.uk
5              
6 6     6   599397 use v5.26;
  6         37  
7 6     6   3705 use Object::Pad 0.73 ':experimental(init_expr adjust_params)';
  6         56194  
  6         28  
8              
9             package Device::Serial::SLuRM 0.04;
10             class Device::Serial::SLuRM;
11              
12 6     6   2578 use Carp;
  6         18  
  6         336  
13              
14 6     6   3251 use Syntax::Keyword::Match;
  6         4594  
  6         30  
15              
16 6     6   862 use Future::AsyncAwait;
  6         16486  
  6         30  
17 6     6   3122 use Future::Buffer 0.03;
  6         11683  
  6         207  
18 6     6   527 use Future::IO;
  6         12672  
  6         215  
19              
20 6     6   2970 use Digest::CRC qw( crc8 );
  6         16411  
  6         488  
21 6     6   47 use Time::HiRes qw( gettimeofday tv_interval );
  6         11  
  6         38  
22              
23 6   50 6   846 use constant DEBUG => $ENV{SLURM_DEBUG} // 0;
  6         15  
  6         591  
24              
25             use constant {
26 6         36680 SLURM_PKTCTRL_META => 0x00,
27             SLURM_PKTCTRL_META_RESET => 0x01,
28             SLURM_PKTCTRL_META_RESETACK => 0x02,
29              
30             SLURM_PKTCTRL_NOTIFY => 0x10,
31              
32             SLURM_PKTCTRL_REQUEST => 0x30,
33              
34             SLURM_PKTCTRL_RESPONSE => 0xB0,
35             SLURM_PKTCTRL_ACK => 0xC0,
36             SLURM_PKTCTRL_ERR => 0xE0,
37 6     6   44 };
  6         12  
38              
39             =encoding UTF-8
40              
41             =head1 NAME
42              
43             C - communicate the SLµRM protocol over a serial port
44              
45             =head1 SYNOPSIS
46              
47             use v5;36;
48             use Device::Serial::SLuRM;
49              
50             my $slurm = Device::Serial::SLuRM->new(
51             dev => "/dev/ttyUSB0",
52             baud => 19200,
53             );
54              
55             $slurm->run(
56             on_notify => sub ($payload) {
57             printf "NOTIFY: %v02X\n", $payload;
58             }
59             )->await;
60              
61             =head1 DESCRIPTION
62              
63             This module provides a L-based interface for communicating with
64             a peer device on a serial port (or similar device handle) which talks the
65             SLµRM messaging protocol. It supports sending and receiving of NOTIFY
66             packets, and sending of REQUEST packets that receive a RESPONSE.
67              
68             It currently does not support receiving REQUESTs, though this could be added
69             relatively easily.
70              
71             =head2 SLµRM
72              
73             SLµRM ("Serial Link Microcontroller Reliable Messaging") is a simple
74             bidirectional communication protocol for adding reliable message framing and
75             request/response semantics to byte-based data links (such as asynchronous
76             serial ports), which may themselves be somewhat unreliable. SLµRM can tolerate
77             bytes arriving corrupted or going missing altogether, or additional noise
78             bytes being received, while still maintaining a reliable bidirectional flow of
79             messages. There are two main kinds of message flows - NOTIFYs and REQUESTs. In
80             all cases, packet payloads can be of a variable length (including zero bytes),
81             and the protocol itself does not put semantic meaning on those bytes - they
82             are free for the application to use as required.
83              
84             A NOTIFY message is a simple notification from one peer to the other, that
85             does not yield a response.
86              
87             A REQUEST message carries typically some sort of command instruction, to which
88             the peer should respond with a RESPONSE or ERR packet. Replies to a REQUEST
89             message do not have to be sent sequentially.
90              
91             The F directory of this distribution contains more detailed protocol
92             documentation which may be useful for writing other implementations.
93              
94             The F directory of this distribution contains a reference
95             implementation in C for 8-bit microcontrollers, such as AVR ATtiny and ATmega
96             chips.
97              
98             =cut
99              
100             =head2 Metrics
101              
102             If L is available, this module additionally provides metrics
103             under the namespace prefix of C. The following metrics are provided:
104              
105             =over 4
106              
107             =item discards
108              
109             An unlabelled counter tracking the number of times a received packet is
110             discarded due to failing CRC check.
111              
112             =item packets
113              
114             A counter, labelled by direction and packet type, tracking the number of
115             packets sent and received of each type.
116              
117             =item retransmits
118              
119             An unlabelled counter tracking the number of times a (REQUEST) packet had to
120             be retransmitted after the initial one timed out.
121              
122             =item serial_bytes
123              
124             A counter, labelled by direction, tracking the number of bytes sent and
125             received directly over the serial port. The rate of this can be used to
126             calculate overall serial link utilisation.
127              
128             =item timeouts
129              
130             An unlabelled counter tracking the number of times a request transaction was
131             abandoned entirely due to a timeout. This does I count transactions that
132             eventually succeeded after intermediate timeouts and retransmissions.
133              
134             =back
135              
136             =cut
137              
138             # Metrics support is entirely optional
139             our $METRICS;
140             eval {
141             require Metrics::Any and Metrics::Any->VERSION( '0.05' ) and
142             Metrics::Any->import( '$METRICS', name_prefix => [ 'slurm' ] );
143             };
144              
145             my %PKTTYPE_NAME;
146              
147             if( defined $METRICS ) {
148             $METRICS->make_counter( discards =>
149             description => "Number of received packets discarded due to CRC check",
150             );
151              
152             $METRICS->make_counter( packets =>
153             description => "Number of SLµRM packets sent and received, by type",
154             labels => [qw( dir type )],
155             );
156              
157             $METRICS->make_distribution( request_success_attempts =>
158             description => "How many requests eventually succeeded after a given number of transmissions",
159             units => "",
160             buckets => [ 1 .. 3 ],
161             );
162              
163             $METRICS->make_timer( request_duration =>
164             description => "How long it took to get a response to each request",
165             buckets => [ 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5 ],
166             );
167              
168             $METRICS->make_counter( retransmits =>
169             description => "Number of retransmits of packets",
170             );
171              
172             $METRICS->make_counter( serial_bytes =>
173             description => "Total number of bytes sent and received on the serial link",
174             labels => [qw( dir )],
175             );
176              
177             $METRICS->make_counter( timeouts =>
178             description => "Number of transactions that were abandoned due to eventual timeout",
179             );
180              
181             %PKTTYPE_NAME = map { __PACKAGE__->can( "SLURM_PKTCTRL_$_" )->() => $_ }
182             qw( META NOTIFY REQUEST RESPONSE ERR ACK );
183              
184             # Keep prometheus increase() happy by initialising all the counters to zero
185             $METRICS->inc_counter_by( discards => 0 );
186             foreach my $dir (qw( rx tx )) {
187             $METRICS->inc_counter_by( packets => 0, [ dir => $dir, type => $_ ] ) for values %PKTTYPE_NAME;
188             $METRICS->inc_counter_by( serial_bytes => 0, [ dir => $dir ] );
189             }
190             $METRICS->inc_counter_by( retransmits => 0 );
191             $METRICS->inc_counter_by( timeouts => 0 );
192             }
193              
194             =head1 PARAMETERS
195              
196             =head2 dev
197              
198             dev => PATH
199              
200             Path to the F device node representing the serial port used for this
201             communication. This will be opened via L and configured into the
202             appropriate mode and baud rate.
203              
204             =head2 baud
205              
206             baud => NUM
207              
208             Optional baud rate to set for communication when opening a device node.
209              
210             SLµRM does not specify a particular rate, but a default value of 115.2k will
211             apply if left unspecified.
212              
213             =head2 fh
214              
215             fh => IO
216              
217             An IO handle directly to the the serial port device to be used for reading and
218             writing. It will be assumed to be set up correctly; no further setup will be
219             performed.
220              
221             Either C or C are required.
222              
223             =head2 retransmit_delay
224              
225             retransmit_delay => NUM
226              
227             Optional delay in seconds to wait after a non-response of a REQUEST packet
228             before sending it again. A default of 50msec (0.05) will apply if not
229             specified.
230              
231             Applications that transfer large amounts of data over slow links, or for which
232             responding to a command may take a long time, should increase this value.
233              
234             =head2 retransmit_count
235              
236             retransmit_count => NUM
237              
238             Optional number of additional attempts to try sending REQUEST packets before
239             giving up entirely. A default of 2 will apply if not specified (thus each
240             C method will make up to 3 attempts).
241              
242             =cut
243              
244             field $_fh :param = undef;
245              
246             ADJUST :params (
247             :$dev = undef,
248             :$baud //= 115200,
249             ) {
250             if( defined $_fh ) {
251             # fine
252             }
253             elsif( defined $dev ) {
254             require IO::Termios;
255              
256             $_fh = IO::Termios->open( $dev, "$baud,8,n,1" ) or
257             croak "Cannot open device $dev - $!";
258              
259             $_fh->cfmakeraw;
260             }
261             else {
262             croak "Require either a 'dev' or 'fh' parameter";
263             }
264             }
265              
266             field $_retransmit_delay :param = 0.05;
267             field $_retransmit_count :param = 2;
268              
269             field $_on_notify;
270              
271             field $_did_reset;
272              
273             field $_seqno_tx = 0 ;
274 1     1   5336 field $_seqno_rx :reader(_seqno_rx); # :reader just for unit-test purposes
  1         6  
275              
276             =head1 METHODS
277              
278             =cut
279              
280             =head2 recv_packet
281              
282             ( $pktctrl, $payload ) = await $slurm->recv_packet;
283              
284             Waits for and returns the next packet to be received from the serial port.
285              
286             =cut
287              
288             field $_recv_buffer;
289              
290             field $_next_resetack_f;
291              
292             async method recv_packet
293 42         120 {
294             $_recv_buffer //= Future::Buffer->new(
295             fill => $METRICS
296             ? sub {
297             Future::IO->sysread( $_fh, 8192 )
298 0     0   0 ->on_done( sub { $METRICS->inc_counter_by( serial_bytes => length $_[0], [ dir => "rx" ] ) } );
  0         0  
299             }
300 35     35   1210 : sub { Future::IO->sysread( $_fh, 8192 ) },
301 42 50 66     169 );
302              
303             PACKET: {
304 42         143 await $_recv_buffer->read_until( qr/\x55/ );
  45         213  
305              
306 33         34730 my ( $pktctrl, $len ) = unpack "C C", my $pkt = await $_recv_buffer->read_exactly( 3 );
307              
308 33 100       2118 if( crc8( $pkt ) != 0 ) {
309             # Header checksum failed
310 1 50       35 $METRICS and
311             $METRICS->inc_counter( discards => );
312              
313 1 50       5 $_recv_buffer->unread( $pkt ) if $pkt =~ m/\x55/;
314 1         3 redo PACKET;
315             }
316              
317 32         1219 $pkt .= await $_recv_buffer->read_exactly( $len + 1 );
318              
319 32 100       3206 if( crc8( $pkt ) != 0 ) {
320             # Body checksum failed
321 2 50       61 $METRICS and
322             $METRICS->inc_counter( discards => );
323              
324 2 100       13 $_recv_buffer->unread( $pkt ) if $pkt =~ m/\x55/;
325 2         35 redo PACKET;
326             }
327              
328 30         916 my $payload = substr( $pkt, 3, $len );
329              
330 30         46 printf STDERR "SLuRM <-RX {%02X/%v02X}\n", $pktctrl, $payload
331             if DEBUG > 1;
332              
333             $METRICS and
334 30 50 0     72 $METRICS->inc_counter( packets => [ dir => "rx", type => $PKTTYPE_NAME{ $pktctrl & 0xF0 } // "UNKNOWN" ] );
335              
336 30         339 return $pktctrl, $payload;
337             }
338 42     42 1 24734 }
339              
340             field @_pending_slots; # [$seqno] = { payload, response_f }
341             field $_run_f;
342              
343             async method _run
344 12         29 {
345 12         20 while(1) {
346 31         8360 my ( $pktctrl, $payload ) = await $self->recv_packet;
347 19         1287 my $seqno = $pktctrl & 0x0F;
348 19         35 $pktctrl &= 0xF0;
349              
350 19 100       46 if( $pktctrl == SLURM_PKTCTRL_META ) {
351 4 50 66     22 if( $seqno == SLURM_PKTCTRL_META_RESET or
352             $seqno == SLURM_PKTCTRL_META_RESETACK ) {
353 4         15 ( $_seqno_rx ) = unpack "C", $payload;
354              
355 4 100       15 if( $seqno == SLURM_PKTCTRL_META_RESET ) {
356 1         5 await $self->send_packet( SLURM_PKTCTRL_META_RESETACK, pack "C", $_seqno_tx );
357             }
358             else {
359 3 50       13 $_next_resetack_f->done if $_next_resetack_f;
360             }
361             }
362             else {
363 0         0 warn sprintf "No idea what to do with pktctrl(meta) = %02X\n", $seqno;
364             }
365              
366 4         3141 next;
367             }
368              
369 15         26 my $is_dup;
370 15 100       35 if( !( $pktctrl & 0x80 ) ) {
371 10 50       22 if( defined $_seqno_rx ) {
372 10         18 my $seqdiff = $seqno - $_seqno_rx;
373 10 100       20 $seqdiff += 16 if $seqdiff < 0;
374 10   100     37 $is_dup = !$seqdiff || $seqdiff > 8; # suppress duplicates / backsteps
375             }
376              
377 10         17 $_seqno_rx = $seqno;
378             }
379              
380             match( $pktctrl : == ) {
381             case( SLURM_PKTCTRL_NOTIFY ) {
382 10 100       20 next if $is_dup;
383              
384 8         9 printf STDERR "SLuRM rx-NOTIFY(%d): %v02X\n", $seqno, $payload
385             if DEBUG;
386              
387 8 50       24 $_on_notify ? $_on_notify->( $payload )
388             : warn "Received NOTIFY packet with no handler\n";
389             }
390              
391             case( SLURM_PKTCTRL_RESPONSE ),
392             case( SLURM_PKTCTRL_ERR ) {
393 5         13 my $slot = $_pending_slots[$seqno];
394 5 50       12 unless( $slot ) {
395 0         0 warn "Received reply to unsent request seqno=$seqno\n";
396 0         0 next;
397             }
398              
399             $METRICS and
400 5 50       12 $METRICS->report_timer( request_duration => tv_interval $slot->{start_time} );
401              
402 5 100       45 if( $pktctrl == SLURM_PKTCTRL_RESPONSE ) {
403 4         29 printf STDERR "SLuRM rx-RESPONSE(%d): %v02X\n", $seqno, $payload
404             if DEBUG;
405              
406 4         17 $slot->{response_f}->done( $payload );
407             }
408             else {
409 1         2 printf STDERR "SLuRM rx-ERR(%d): %v02X\n", $seqno, $payload
410             if DEBUG;
411              
412 1 50       17 my $message = sprintf "Received ERR packet <%v02X%s>",
413             substr( $payload, 0, 3 ),
414             length $payload > 3 ? "..." : "";
415 1         9 $slot->{response_f}->fail( $message, slurm => $payload );
416             }
417 5         839 $slot->{retransmit_f}->cancel;
418              
419             $METRICS and
420 5 50       223 $METRICS->report_distribution( request_success_attempts => 1 + $_retransmit_count - $slot->{retransmit_count} );
421              
422 5         10 undef $_pending_slots[$seqno];
423              
424 5         7 printf STDERR "SLuRM tx-ACK(%d)\n", $seqno
425             if DEBUG;
426              
427 5         14 await $self->send_packet_twice( SLURM_PKTCTRL_ACK | $seqno, "" );
428             }
429             default {
430 0         0 die sprintf "TODO: Received unrecognised packet type=%02X\n", $pktctrl;
431             }
432             }
433 15 100 66     60 }
    50          
434 12     12   22 }
435              
436             # undocumented but useful for unit tests
437             method _start
438 15     15   307 {
439 15   66 0   75 $_run_f //= $self->_run->on_fail( sub { die "Device::Serial::SLuRM runloop failed: $_[0]" } );
  0         0  
440              
441 15         16366 return $_run_f;
442             }
443              
444             =head2 run
445              
446             $run_f = $slurm->run( %args );
447              
448             Starts the receiver run-loop, which can be used to wait for incoming NOTIFY
449             packets. This method returns a future, but the returned future will not
450             complete in normal circumstances. It will remain pending while the run-loop is
451             running. If an unrecoverable error happens (such as an IO error on the
452             underlying serial port device) then this future will fail.
453              
454             Takes the following named arguments:
455              
456             =over 4
457              
458             =item on_notify => CODE
459              
460             $on_notify->( $payload )
461              
462             Optional. Invoked on receipt of a NOTIFY packet.
463              
464             =back
465              
466             Will automatically L first if required.
467              
468             =cut
469              
470 5         8 async method run ( %args )
  5         12  
  5         9  
471 5         19 {
472 5         11 $_on_notify = $args{on_notify}; # TODO: save old, restore on exit?
473              
474 5 100       13 $_did_reset or
475             await $self->reset;
476              
477             await $self->_start
478 5     5   1031 ->on_cancel( sub { undef $_on_notify } );
  5         87  
479 5     5 1 17616 }
480              
481             =head2 stop
482              
483             $slurm->stop;
484              
485             Stops the receiver run-loop, if running, causing its future to be cancelled.
486              
487             It is not an error to call this method if the run loop is not running.
488              
489             =cut
490              
491             method stop
492 12     12 1 39367 {
493 12 50       40 return unless $_run_f;
494              
495 12 50       26 eval { $_run_f->cancel } or warn "Failed to ->cancel the runloop future - $@";
  12         60  
496 12         3055 undef $_run_f;
497             }
498              
499             =head2 send_packet
500              
501             await $slurm->send_packet( $pktctrl, $payload );
502              
503             Sends a packet to the serial port.
504              
505             =cut
506              
507 31         50 async method send_packet ( $pktctrl, $payload )
  31         47  
  31         49  
  31         43  
508 31         80 {
509 31         43 printf STDERR "SLuRM TX-> {%02X/%v02X}\n", $pktctrl, $payload
510             if DEBUG > 1;
511              
512 31         102 my $bytes = pack( "C C", $pktctrl, length $payload );
513 31         87 $bytes .= pack( "C", crc8( $bytes ) );
514              
515 31         1037 $bytes .= $payload;
516 31         78 $bytes .= pack( "C", crc8( $bytes ) );
517              
518             $METRICS and
519 31 50 0     866 $METRICS->inc_counter( packets => [ dir => "tx", type => $PKTTYPE_NAME{ $pktctrl & 0xF0 } // "UNKNOWN" ] );
520 31 50       67 $METRICS and
521             $METRICS->inc_counter_by( serial_bytes => 1 + length $bytes, [ dir => "tx" ] );
522              
523 31         162 return await Future::IO->syswrite_exactly( $_fh, "\x55" . $bytes );
524 31     31 1 3433 }
525              
526 10         17 async method send_packet_twice ( $pktctrl, $payload )
  10         24  
  10         17  
  10         13  
527 10         29 {
528 10         27 await $self->send_packet( $pktctrl, $payload );
529             # TODO: Send again after a short delay
530 10         38022 await $self->send_packet( $pktctrl, $payload );
531 10     10 0 18 }
532              
533             =head2 reset
534              
535             $slurm->reset;
536              
537             Resets the transmitter sequence number and sends a META-RESET packet.
538              
539             It is not normally required to explicitly call this, as the first call to
540             L, L or L will do it if required.
541              
542             =cut
543              
544             async method reset
545 3         8 {
546 3         7 $_seqno_tx = 0;
547              
548 3         15 await $self->send_packet_twice( SLURM_PKTCTRL_META_RESET, pack "C", $_seqno_tx );
549 3         5186 $_did_reset = 1;
550              
551 3         18 $self->_start;
552              
553             # TODO: These might collide, do we need a Queue?
554 3         14 await $_next_resetack_f = $_run_f->new;
555 3         213 undef $_next_resetack_f;
556 3     3 1 2372 }
557              
558             =head2 send_notify
559              
560             await $slurm->send_notify( $payload )
561              
562             Sends a NOTIFY packet.
563              
564             Will automatically L first if required.
565              
566             =cut
567              
568 2         20 async method send_notify ( $payload )
  2         5  
  2         23  
569 2         8 {
570 2 50       9 $_did_reset or
571             await $self->reset;
572              
573 2         5 ( $_seqno_tx += 1 ) &= 0x0F;
574 2         4 my $pktctrl = SLURM_PKTCTRL_NOTIFY | $_seqno_tx;
575              
576 2         6 await $self->send_packet_twice( $pktctrl, $payload );
577 2     2 1 8328 }
578              
579             =head2 request
580              
581             $data_in = await $slurm->request( $data_out );
582              
583             Sends a REQUEST packet, and waits for a response to it.
584              
585             If the peer responds with an ERR packet, the returned future will fail with
586             an error message, the category of C, and the payload body of the ERR
587             packet in the message details:
588              
589             $f->fail( $message, slurm => $payload );
590              
591             If the peer does not respond at all and all retransmit attempts end in a
592             timeout, the returned future will fail the same way but with C as the
593             message details:
594              
595             $f->fail( $message, slurm => undef );
596              
597             Will automatically L first if required.
598              
599             =cut
600              
601 6         9 async method request ( $payload )
  6         12  
  6         8  
602 6         26 {
603 6 100       20 $_did_reset or
604             await $self->reset;
605              
606 6         83 ( $_seqno_tx += 1 ) &= 0x0F;
607 6         11 my $seqno = $_seqno_tx;
608              
609 6         8 printf STDERR "SLuRM tx-REQUEST(%d): %v02X\n", $seqno, $payload
610             if DEBUG;
611              
612 6 50       32 $_pending_slots[$seqno] and croak "TODO: Request seqno collision - pick a new one?";
613              
614 6         12 my $pktctrl = SLURM_PKTCTRL_REQUEST | $seqno;
615              
616 6         14 await $self->send_packet( $pktctrl, $payload );
617              
618 6         8455 $self->_start;
619              
620 6         18 $_pending_slots[$seqno] = {
621             payload => $payload,
622             response_f => my $f = $_run_f->new,
623             retransmit_count => $_retransmit_count,
624             start_time => [ gettimeofday ],
625             };
626              
627 6         74 $self->_set_retransmit( $seqno );
628              
629 6         6803 return await $f;
630 6     6 1 6467 }
631              
632 9         15 method _set_retransmit ( $seqno )
  9         14  
  9         13  
633 9     9   27 {
634 9 50       26 my $slot = $_pending_slots[$seqno] or die "ARG expected $seqno request";
635              
636             $slot->{retransmit_f} = Future::IO->sleep( $_retransmit_delay )
637             ->on_done( sub {
638 4 100   4   2605 if( $slot->{retransmit_count}-- ) {
639 3         6 printf STDERR "SLuRM retransmit REQUEST(%d)\n", $seqno
640             if DEBUG;
641              
642 3         6 my $pktctrl = SLURM_PKTCTRL_REQUEST | $seqno;
643             $slot->{retransmit_f} = $self->send_packet( $pktctrl, $slot->{payload} )
644             ->on_fail( sub {
645 0         0 warn "Retransmit failed: @_";
646 0         0 $slot->{response_f}->fail( @_ );
647             } )
648             ->on_done( sub {
649 3         741 $self->_set_retransmit( $seqno );
650 3         8 } );
651              
652 3 50       3940 $METRICS and
653             $METRICS->inc_counter( retransmits => );
654             }
655             else {
656 1         2 printf STDERR "SLuRM timeout REQUEST(%d)\n", $seqno
657             if DEBUG;
658              
659 1         8 my $message = sprintf "Request timed out after %d attempts\n", 1 + $_retransmit_count;
660 1         4 $slot->{response_f}->fail( $message, slurm => undef );
661              
662 1 50       187 $METRICS and
663             $METRICS->inc_counter( timeouts => );
664              
665 1         4 undef $_pending_slots[$seqno];
666             }
667 9         29 });
668             }
669              
670             =head1 AUTHOR
671              
672             Paul Evans
673              
674             =cut
675              
676             0x55AA;