File Coverage

blib/lib/IO/Async/Channel.pm
Criterion Covered Total %
statement 156 168 92.8
branch 56 78 71.7
condition 31 48 64.5
subroutine 32 36 88.8
pod 6 11 54.5
total 281 341 82.4


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Channel;
7              
8 13     13   58485 use strict;
  13         26  
  13         432  
9 13     13   70 use warnings;
  13         111  
  13         514  
10 13     13   76 use base qw( IO::Async::Notifier );
  13         26  
  13         1894  
11              
12             our $VERSION = '0.801';
13              
14 13     13   91 use Carp;
  13         27  
  13         707  
15              
16 13     13   5929 use IO::Async::Stream;
  13         37  
  13         33494  
17              
18             =head1 NAME
19              
20             C - pass values into or out from an L
21              
22             =head1 DESCRIPTION
23              
24             A C object allows Perl values to be passed into or out of
25             an L. It is intended to be used primarily with a Routine
26             object rather than independently. For more detail and examples on how to use
27             this object see also the documentation for L.
28              
29             A Channel object is shared between the main process of the program and the
30             process running within the Routine. In the main process it will be used in
31             asynchronous mode, and in the Routine process it will be used in synchronous
32             mode. In asynchronous mode all methods return immediately and use
33             L-style futures or callback functions. In synchronous within the
34             Routine process the methods block until they are ready and may be used for
35             flow-control within the routine. Alternatively, a Channel may be shared
36             between two different Routine objects, and not used directly by the
37             controlling program.
38              
39             The channel itself represents a FIFO of Perl reference values. New values may
40             be put into the channel by the C method in either mode. Values may be
41             retrieved from it by the C method. Values inserted into the Channel are
42             snapshot by the C method. Any changes to referred variables will not be
43             observed by the other end of the Channel after the C method returns.
44              
45             =head1 PARAMETERS
46              
47             The following named parameters may be passed to C or C:
48              
49             =head2 codec => STR
50              
51             Gives the name of the encoding method used to represent values over the
52             channel.
53              
54             This can be set to C to use the core L module. As this
55             only supports references, to pass a single scalar value, C a SCALAR
56             reference to it, and dereference the result of C.
57              
58             If the L and L modules are installed, this
59             can be set to C instead, and will use those to perform the encoding
60             and decoding. This optional dependency may give higher performance than using
61             C. If these modules are available, then this option is picked by
62             default.
63              
64             =cut
65              
66             =head1 CONSTRUCTOR
67              
68             =cut
69              
70             =head2 new
71              
72             $channel = IO::Async::Channel->new
73              
74             Returns a new C object. This object reference itself
75             should be shared by both sides of a Ced process. After C the
76             two C methods may be used to configure the object for operation on
77             either end.
78              
79             While this object does in fact inherit from L, it should
80             not be added to a Loop object directly; event management will be handled by
81             its containing L object.
82              
83             =cut
84              
85             # Undocumented convenience constructors for running IaRoutine in 'spawn' mode
86             sub new_sync
87             {
88 0     0 0 0 my $class = shift;
89 0         0 my ( $fd ) = @_;
90              
91 0         0 my $self = $class->new;
92 0         0 $self->setup_sync_mode( $fd );
93 0         0 return $self;
94             }
95              
96 0     0 0 0 sub new_stdin { shift->new_sync( \*STDIN ); }
97 0     0 0 0 sub new_stdout { shift->new_sync( \*STDOUT ); }
98              
99             sub DESTROY
100             {
101 126     126   13917 my $self = shift;
102 126         223 eval { $self->close }; # ignore any error
  126         372  
103             }
104              
105             =head1 METHODS
106              
107             The following methods documented with a trailing call to C<< ->get >> return
108             L instances.
109              
110             =cut
111              
112             =head2 configure
113              
114             $channel->configure( %params )
115              
116             Similar to the standard C method on L, this is
117             used to change details of the Channel's operation.
118              
119             =over 4
120              
121             =item on_recv => CODE
122              
123             May only be set on an async mode channel. If present, will be invoked whenever
124             a new value is received, rather than using the C method.
125              
126             $on_recv->( $channel, $data )
127              
128             =item on_eof => CODE
129              
130             May only be set on an async mode channel. If present, will be invoked when the
131             channel gets closed by the peer.
132              
133             $on_eof->( $channel )
134              
135             =back
136              
137             =cut
138              
139             my $DEFAULT_CODEC;
140             sub _default_codec
141             {
142 153   66 153   614 $DEFAULT_CODEC ||= do {
143 8         14 my $HAVE_SEREAL = defined eval {
144 8         49 require Sereal::Encoder; require Sereal::Decoder };
  8         41  
145 8 50       76 $HAVE_SEREAL ? "Sereal" : "Storable";
146             };
147             }
148              
149             sub _init
150             {
151 148     148   256 my $self = shift;
152 148         280 my ( $params ) = @_;
153              
154 148 100       538 defined $params->{codec} or $params->{codec} = _default_codec;
155              
156 148         620 $self->SUPER::_init( $params );
157             }
158              
159             sub configure
160             {
161 149     149 1 273 my $self = shift;
162 149         507 my %params = @_;
163              
164 149         361 foreach (qw( on_recv on_eof )) {
165 298 100       686 next unless exists $params{$_};
166 2 50 33     9 $self->{mode} and $self->{mode} eq "async" or
167             croak "Can only configure $_ in async mode";
168              
169 2         5 $self->{$_} = delete $params{$_};
170 2         4 $self->_build_stream;
171             }
172              
173 149 100       414 if( my $codec = delete $params{codec} ) {
174 148   33     985 @{ $self }{qw( encode decode )} = (
  148         587  
175             $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'"
176             )->();
177             }
178              
179 149         562 $self->SUPER::configure( %params );
180             }
181              
182             sub _make_codec_Storable
183             {
184 0     0   0 require Storable;
185              
186             return
187 0         0 \&Storable::freeze,
188             \&Storable::thaw;
189             }
190              
191             sub _make_codec_Sereal
192             {
193 154     154   908 require Sereal::Encoder;
194 154         567 require Sereal::Decoder;
195              
196 154         257 my $encoder;
197             my $decoder;
198              
199             # "thread safety" to Sereal::{Encoder,Decoder} means that the variables get
200             # reset to undef in new threads. We should defend against that.
201              
202             return
203 113   66 113   2154 sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) },
204 154   66 110   1302 sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) };
  110         3348  
205             }
206              
207             =head2 send
208              
209             $channel->send( $data )
210              
211             Pushes the data stored in the given Perl reference into the FIFO of the
212             Channel, where it can be received by the other end. When called on a
213             synchronous mode Channel this method may block if a C call on the
214             underlying filehandle blocks. When called on an asynchronous mode channel this
215             method will not block.
216              
217             =cut
218              
219             my %SENDMETHODS;
220             sub send
221             {
222 21     21 1 2359 my $self = shift;
223 21         56 my ( $data ) = @_;
224              
225 21 50       154 defined( my $mode = $self->{mode} ) or die "Cannot ->send without being set up";
226              
227 21 50 66     350 my $code = ( $SENDMETHODS{$mode} ||= $self->can( "_send_$mode" ) )
228             or die "IO::Async::Channel cannot send in unrecognised mode '$mode'";
229              
230 21         152 $self->$code( $data );
231             }
232              
233             *_send_sync = *_send_async = sub {
234 21     21   67 my ( $self, $data ) = @_;
235 21         138 $self->send_encoded( $self->{encode}->( $data ) );
236             };
237              
238             =head2 send_encoded
239              
240             $channel->send_encoded( $record )
241              
242             A variant of the C method; this method pushes the byte record given.
243             This should be the result of a call to C.
244              
245             =cut
246              
247             sub send_encoded
248             {
249 111     111 1 219 my $self = shift;
250 111         211 my ( $record ) = @_;
251              
252 111         649 my $bytes = pack( "I", length $record ) . $record;
253              
254 111 50       338 defined $self->{mode} or die "Cannot ->send without being set up";
255              
256 111 100       296 return $self->_sendbytes_sync( $bytes ) if $self->{mode} eq "sync";
257 101 50       868 return $self->_sendbytes_async( $bytes ) if $self->{mode} eq "async";
258             }
259              
260             =head2 encode
261              
262             $record = $channel->encode( $data )
263              
264             Takes a Perl reference and returns a serialised string that can be passed to
265             C. The following two forms are equivalent
266              
267             $channel->send( $data )
268             $channel->send_encoded( $channel->encode( $data ) )
269              
270             This is provided for the use-case where data needs to be serialised into a
271             fixed string to "snapshot it" but not sent yet; the returned string can be
272             saved and sent at a later time.
273              
274             $record = IO::Async::Channel->encode( $data )
275              
276             This can also be used as a class method, in case it is inconvenient to operate
277             on a particular object instance, or when one does not exist yet. In this case
278             it will encode using whatever is the default codec for C.
279              
280             =cut
281              
282             my $default_encode;
283             sub encode
284             {
285 92     92 1 212 my $self = shift;
286 92         184 my ( $data ) = @_;
287              
288             return ( ref $self ?
289             $self->{encode} :
290 92 100 66     418 $default_encode ||= do { ( $self->can( "_make_codec_" . _default_codec )->() )[0] }
  6         57  
291             )->( $data );
292             }
293              
294             =head2 recv
295              
296             $data = $channel->recv
297              
298             When called on a synchronous mode Channel this method will block until a Perl
299             reference value is available from the other end and then return it. If the
300             Channel is closed this method will return C. Since only references may
301             be passed and all Perl references are true the truth of the result of this
302             method can be used to detect that the channel is still open and has not yet
303             been closed.
304              
305             $data = $channel->recv->get
306              
307             When called on an asynchronous mode Channel this method returns a future which
308             will eventually yield the next Perl reference value that becomes available
309             from the other end. If the Channel is closed, the future will fail with an
310             C failure.
311              
312             $channel->recv( %args )
313              
314             When not returning a future, takes the following named arguments:
315              
316             =over 8
317              
318             =item on_recv => CODE
319              
320             Called when a new Perl reference value is available. Will be passed the
321             Channel object and the reference data.
322              
323             $on_recv->( $channel, $data )
324              
325             =item on_eof => CODE
326              
327             Called if the Channel was closed before a new value was ready. Will be passed
328             the Channel object.
329              
330             $on_eof->( $channel )
331              
332             =back
333              
334             =cut
335              
336             my %RECVMETHODS;
337             sub recv
338             {
339 115     115 1 602 my $self = shift;
340              
341 115 50       531 defined( my $mode = $self->{mode} ) or die "Cannot ->recv without being set up";
342              
343 115 50 66     662 my $code = ( $RECVMETHODS{$mode} ||= $self->can( "_recv_$mode" ) )
344             or die "IO::Async::Channel cannot recv in unrecognised mode '$mode'";
345              
346 115         390 return $self->$code( @_ );
347             }
348              
349             =head2 close
350              
351             $channel->close
352              
353             Closes the channel. Causes a pending C on the other end to return undef
354             or the queued C callbacks to be invoked.
355              
356             =cut
357              
358             my %CLOSEMETHODS;
359             sub close
360             {
361 182     182 1 1867 my $self = shift;
362              
363 182 50       600 defined( my $mode = $self->{mode} ) or return;
364              
365 182 50 66     797 my $code = ( $CLOSEMETHODS{$mode} ||= $self->can( "_close_$mode" ) )
366             or die "IO::Async::Channel cannot close in unrecognised mode '$mode'";
367              
368 182         428 return $self->$code;
369             }
370              
371             # Leave this undocumented for now
372             sub setup_sync_mode
373             {
374 11     11 0 92 my $self = shift;
375 11         62 ( $self->{fh} ) = @_;
376              
377 11         37 $self->{mode} = "sync";
378              
379             # Since we're communicating binary structures and not Unicode text we need to
380             # enable binmode
381 11         34 binmode $self->{fh};
382              
383 11   66     73 defined and $_->blocking( 1 ) for $self->{read_handle}, $self->{write_handle};
384 11         108 $self->{fh}->autoflush(1);
385             }
386              
387             sub _read_exactly
388             {
389 12     12   18 $_[1] = "";
390              
391 12         34 while( length $_[1] < $_[2] ) {
392 12         510 my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
393 12 50       38 defined $n or return undef;
394 12 100       50 $n or return "";
395             }
396              
397 10         14 return $_[2];
398             }
399              
400             sub _recv_sync
401             {
402 7     7   10 my $self = shift;
403              
404 7         16 my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
405 7 50       24 defined $n or die "Cannot read - $!";
406 7 100       25 length $n or return undef;
407              
408 5         23 my $len = unpack( "I", $lenbuffer );
409              
410 5         13 $n = _read_exactly( $self->{fh}, my $record, $len );
411 5 50       16 defined $n or die "Cannot read - $!";
412 5 50       17 length $n or return undef;
413              
414 5         18 return $self->{decode}->( $record );
415             }
416              
417             sub _sendbytes_sync
418             {
419 10     10   13 my $self = shift;
420 10         14 my ( $bytes ) = @_;
421 10         28 $self->{fh}->print( $bytes );
422             }
423              
424             sub _close_sync
425             {
426 14     14   15 my $self = shift;
427 14         37 $self->{fh}->close;
428             }
429              
430             # Leave this undocumented for now
431             sub setup_async_mode
432             {
433 140     140 0 775 my $self = shift;
434 140         333 my %args = @_;
435              
436 140   66     648 exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle );
437              
438 140 50       353 keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );
439              
440 140   66     1934 defined and $_->blocking( 0 ) for $self->{read_handle}, $self->{write_handle};
441 140         765 $self->{mode} = "async";
442             }
443              
444             sub _build_stream
445             {
446 211     211   273 my $self = shift;
447 211   66     2170 return $self->{stream} ||= do {
448 128         429 $self->{on_result_queue} = [];
449              
450             my $stream = IO::Async::Stream->new(
451             read_handle => $self->{read_handle},
452             write_handle => $self->{write_handle},
453 128         676 autoflush => 1,
454             on_read => $self->_capture_weakself( '_on_stream_read' )
455             );
456              
457 128         614 $self->add_child( $stream );
458              
459 128         1047 $stream;
460             };
461             }
462              
463             sub _sendbytes_async
464             {
465 101     101   220 my $self = shift;
466 101         220 my ( $bytes ) = @_;
467 101         270 $self->_build_stream->write( $bytes );
468             }
469              
470             sub _recv_async
471             {
472 108     108   190 my $self = shift;
473 108         248 my %args = @_;
474              
475 108         228 my $on_recv = $args{on_recv};
476 108         204 my $on_eof = $args{on_eof};
477              
478 108         293 my $stream = $self->_build_stream;
479              
480 108         190 my $f;
481 108 100       697 $f = $stream->loop->new_future unless !defined wantarray;
482              
483 108         1575 push @{ $self->{on_result_queue} }, sub {
484 108     108   372 my ( $self, $type, $result ) = @_;
485 108 100       328 if( $type eq "recv" ) {
486 104 100 100     1082 $f->done( $result ) if $f and !$f->is_cancelled;
487 104 100       6721 $on_recv->( $self, $result ) if $on_recv;
488             }
489             else {
490 4 100 66     71 $f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled;
491 4 100       218 $on_eof->( $self ) if $on_eof;
492             }
493 108         181 };
494              
495 108         2613 return $f;
496             }
497              
498             sub _close_async
499             {
500 168     168   265 my $self = shift;
501 168 100       406 if( my $stream = $self->{stream} ) {
502 163         477 $stream->close_when_empty;
503             }
504             else {
505 5   66     109 $_ and $_->close for $self->{read_handle}, $self->{write_handle};
506             }
507              
508 168         6721 undef $_ for $self->{read_handle}, $self->{write_handle};
509             }
510              
511             sub _on_stream_read
512             {
513 112 50   112   446 my $self = shift or return;
514 112         308 my ( $stream, $buffref, $eof ) = @_;
515              
516 112 100       274 if( $eof ) {
517 7         47 while( my $on_result = shift @{ $self->{on_result_queue} } ) {
  11         95  
518 4         57 $on_result->( $self, eof => );
519             }
520 7 100       81 $self->{on_eof}->( $self ) if $self->{on_eof};
521 7         45 return;
522             }
523              
524 105 50       269 return 0 unless length( $$buffref ) >= 4;
525 105         421 my $len = unpack( "I", $$buffref );
526 105 50       321 return 0 unless length( $$buffref ) >= 4 + $len;
527              
528 105         552 my $record = $self->{decode}->( substr( $$buffref, 4, $len ) );
529 105         456 substr( $$buffref, 0, 4 + $len ) = "";
530              
531 105 100       170 if( my $on_result = shift @{ $self->{on_result_queue} } ) {
  105         361  
532 104         281 $on_result->( $self, recv => $record );
533             }
534             else {
535 1         4 $self->{on_recv}->( $self, $record );
536             }
537              
538 105         1564 return 1;
539             }
540              
541             sub _extract_read_handle
542             {
543 66     66   169 my $self = shift;
544              
545 66 100       351 return undef if !$self->{mode};
546              
547 1 50       30 croak "Cannot extract filehandle" if $self->{mode} ne "async";
548 1         26 $self->{mode} = "dead";
549              
550 1         15 return $self->{read_handle};
551             }
552              
553             sub _extract_write_handle
554             {
555 67     67   124 my $self = shift;
556              
557 67 50       354 return undef if !$self->{mode};
558              
559 0 0         croak "Cannot extract filehandle" if $self->{mode} ne "async";
560 0           $self->{mode} = "dead";
561              
562 0           return $self->{write_handle};
563             }
564              
565             =head1 AUTHOR
566              
567             Paul Evans
568              
569             =cut
570              
571             0x55AA;