File Coverage

blib/lib/IO/Async/Channel.pm
Criterion Covered Total %
statement 156 168 92.8
branch 56 78 71.7
condition 31 48 64.5
subroutine 32 36 88.8
pod 6 11 54.5
total 281 341 82.4


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Channel;
7              
8 13     13   61603 use strict;
  13         18  
  13         361  
9 13     13   70 use warnings;
  13         26  
  13         336  
10 13     13   55 use base qw( IO::Async::Notifier );
  13         22  
  13         1637  
11              
12             our $VERSION = '0.802';
13              
14 13     13   77 use Carp;
  13         28  
  13         745  
15              
16 13     13   5119 use IO::Async::Stream;
  13         37  
  13         27419  
17              
18             =head1 NAME
19              
20             C - pass values into or out from an L
21              
22             =head1 DESCRIPTION
23              
24             A C object allows Perl values to be passed into or out of
25             an L. It is intended to be used primarily with a Routine
26             object rather than independently. For more detail and examples on how to use
27             this object see also the documentation for L.
28              
29             A Channel object is shared between the main process of the program and the
30             process running within the Routine. In the main process it will be used in
31             asynchronous mode, and in the Routine process it will be used in synchronous
32             mode. In asynchronous mode all methods return immediately and use
33             L-style futures or callback functions. In synchronous within the
34             Routine process the methods block until they are ready and may be used for
35             flow-control within the routine. Alternatively, a Channel may be shared
36             between two different Routine objects, and not used directly by the
37             controlling program.
38              
39             The channel itself represents a FIFO of Perl reference values. New values may
40             be put into the channel by the C method in either mode. Values may be
41             retrieved from it by the C method. Values inserted into the Channel are
42             snapshot by the C method. Any changes to referred variables will not be
43             observed by the other end of the Channel after the C method returns.
44              
45             =head1 PARAMETERS
46              
47             The following named parameters may be passed to C or C:
48              
49             =head2 codec => STR
50              
51             Gives the name of the encoding method used to represent values over the
52             channel.
53              
54             This can be set to C to use the core L module. As this
55             only supports references, to pass a single scalar value, C a SCALAR
56             reference to it, and dereference the result of C.
57              
58             If the L and L modules are installed, this
59             can be set to C instead, and will use those to perform the encoding
60             and decoding. This optional dependency may give higher performance than using
61             C. If these modules are available, then this option is picked by
62             default.
63              
64             =cut
65              
66             =head1 CONSTRUCTOR
67              
68             =cut
69              
70             =head2 new
71              
72             $channel = IO::Async::Channel->new
73              
74             Returns a new C object. This object reference itself
75             should be shared by both sides of a Ced process. After C the
76             two C methods may be used to configure the object for operation on
77             either end.
78              
79             While this object does in fact inherit from L, it should
80             not be added to a Loop object directly; event management will be handled by
81             its containing L object.
82              
83             =cut
84              
85             # Undocumented convenience constructors for running IaRoutine in 'spawn' mode
86             sub new_sync
87             {
88 0     0 0 0 my $class = shift;
89 0         0 my ( $fd ) = @_;
90              
91 0         0 my $self = $class->new;
92 0         0 $self->setup_sync_mode( $fd );
93 0         0 return $self;
94             }
95              
96 0     0 0 0 sub new_stdin { shift->new_sync( \*STDIN ); }
97 0     0 0 0 sub new_stdout { shift->new_sync( \*STDOUT ); }
98              
99             sub DESTROY
100             {
101 126     126   14588 my $self = shift;
102 126         187 eval { $self->close }; # ignore any error
  126         305  
103             }
104              
105             =head1 METHODS
106              
107             The following methods documented with a trailing call to C<< ->get >> return
108             L instances.
109              
110             =cut
111              
112             =head2 configure
113              
114             $channel->configure( %params )
115              
116             Similar to the standard C method on L, this is
117             used to change details of the Channel's operation.
118              
119             =over 4
120              
121             =item on_recv => CODE
122              
123             May only be set on an async mode channel. If present, will be invoked whenever
124             a new value is received, rather than using the C method.
125              
126             $on_recv->( $channel, $data )
127              
128             =item on_eof => CODE
129              
130             May only be set on an async mode channel. If present, will be invoked when the
131             channel gets closed by the peer.
132              
133             $on_eof->( $channel )
134              
135             =back
136              
137             =cut
138              
139             my $DEFAULT_CODEC;
140             sub _default_codec
141             {
142 153   66 153   756 $DEFAULT_CODEC ||= do {
143 8         10 my $HAVE_SEREAL = defined eval {
144 8         44 require Sereal::Encoder; require Sereal::Decoder };
  8         72  
145 8 50       98 $HAVE_SEREAL ? "Sereal" : "Storable";
146             };
147             }
148              
149             sub _init
150             {
151 148     148   194 my $self = shift;
152 148         285 my ( $params ) = @_;
153              
154 148 100       561 defined $params->{codec} or $params->{codec} = _default_codec;
155              
156 148         625 $self->SUPER::_init( $params );
157             }
158              
159             sub configure
160             {
161 149     149 1 222 my $self = shift;
162 149         630 my %params = @_;
163              
164 149         331 foreach (qw( on_recv on_eof )) {
165 298 100       597 next unless exists $params{$_};
166 2 50 33     11 $self->{mode} and $self->{mode} eq "async" or
167             croak "Can only configure $_ in async mode";
168              
169 2         5 $self->{$_} = delete $params{$_};
170 2         5 $self->_build_stream;
171             }
172              
173 149 100       408 if( my $codec = delete $params{codec} ) {
174 148   33     877 @{ $self }{qw( encode decode )} = (
  148         507  
175             $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'"
176             )->();
177             }
178              
179 149         483 $self->SUPER::configure( %params );
180             }
181              
182             sub _make_codec_Storable
183             {
184 0     0   0 require Storable;
185              
186             return
187 0         0 \&Storable::freeze,
188             \&Storable::thaw;
189             }
190              
191             sub _make_codec_Sereal
192             {
193 154     154   734 require Sereal::Encoder;
194 154         410 require Sereal::Decoder;
195              
196 154         211 my $encoder;
197             my $decoder;
198              
199             # "thread safety" to Sereal::{Encoder,Decoder} means that the variables get
200             # reset to undef in new threads. We should defend against that.
201              
202             return
203 113   66 113   1899 sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) },
204 154   66 110   1018 sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) };
  110         2818  
205             }
206              
207             =head2 send
208              
209             $channel->send( $data )
210              
211             Pushes the data stored in the given Perl reference into the FIFO of the
212             Channel, where it can be received by the other end. When called on a
213             synchronous mode Channel this method may block if a C call on the
214             underlying filehandle blocks. When called on an asynchronous mode channel this
215             method will not block.
216              
217             =cut
218              
219             my %SENDMETHODS;
220             sub send
221             {
222 21     21 1 2089 my $self = shift;
223 21         380 my ( $data ) = @_;
224              
225 21 50       158 defined( my $mode = $self->{mode} ) or die "Cannot ->send without being set up";
226              
227 21 50 66     361 my $code = ( $SENDMETHODS{$mode} ||= $self->can( "_send_$mode" ) )
228             or die "IO::Async::Channel cannot send in unrecognised mode '$mode'";
229              
230 21         125 $self->$code( $data );
231             }
232              
233             *_send_sync = *_send_async = sub {
234 21     21   59 my ( $self, $data ) = @_;
235 21         105 $self->send_encoded( $self->{encode}->( $data ) );
236             };
237              
238             =head2 send_encoded
239              
240             $channel->send_encoded( $record )
241              
242             A variant of the C method; this method pushes the byte record given.
243             This should be the result of a call to C.
244              
245             =cut
246              
247             sub send_encoded
248             {
249 111     111 1 204 my $self = shift;
250 111         206 my ( $record ) = @_;
251              
252 111         732 my $bytes = pack( "I", length $record ) . $record;
253              
254 111 50       312 defined $self->{mode} or die "Cannot ->send without being set up";
255              
256 111 100       269 return $self->_sendbytes_sync( $bytes ) if $self->{mode} eq "sync";
257 101 50       628 return $self->_sendbytes_async( $bytes ) if $self->{mode} eq "async";
258             }
259              
260             =head2 encode
261              
262             $record = $channel->encode( $data )
263              
264             Takes a Perl reference and returns a serialised string that can be passed to
265             C. The following two forms are equivalent
266              
267             $channel->send( $data )
268             $channel->send_encoded( $channel->encode( $data ) )
269              
270             This is provided for the use-case where data needs to be serialised into a
271             fixed string to "snapshot it" but not sent yet; the returned string can be
272             saved and sent at a later time.
273              
274             $record = IO::Async::Channel->encode( $data )
275              
276             This can also be used as a class method, in case it is inconvenient to operate
277             on a particular object instance, or when one does not exist yet. In this case
278             it will encode using whatever is the default codec for C.
279              
280             =cut
281              
282             my $default_encode;
283             sub encode
284             {
285 92     92 1 276 my $self = shift;
286 92         171 my ( $data ) = @_;
287              
288             return ( ref $self ?
289             $self->{encode} :
290 92 100 66     403 $default_encode ||= do { ( $self->can( "_make_codec_" . _default_codec )->() )[0] }
  6         36  
291             )->( $data );
292             }
293              
294             =head2 recv
295              
296             $data = $channel->recv
297              
298             When called on a synchronous mode Channel this method will block until a Perl
299             reference value is available from the other end and then return it. If the
300             Channel is closed this method will return C. Since only references may
301             be passed and all Perl references are true the truth of the result of this
302             method can be used to detect that the channel is still open and has not yet
303             been closed.
304              
305             $data = $channel->recv->get
306              
307             When called on an asynchronous mode Channel this method returns a future which
308             will eventually yield the next Perl reference value that becomes available
309             from the other end. If the Channel is closed, the future will fail with an
310             C failure.
311              
312             $channel->recv( %args )
313              
314             When not returning a future, takes the following named arguments:
315              
316             =over 8
317              
318             =item on_recv => CODE
319              
320             Called when a new Perl reference value is available. Will be passed the
321             Channel object and the reference data.
322              
323             $on_recv->( $channel, $data )
324              
325             =item on_eof => CODE
326              
327             Called if the Channel was closed before a new value was ready. Will be passed
328             the Channel object.
329              
330             $on_eof->( $channel )
331              
332             =back
333              
334             =cut
335              
336             my %RECVMETHODS;
337             sub recv
338             {
339 115     115 1 511 my $self = shift;
340              
341 115 50       471 defined( my $mode = $self->{mode} ) or die "Cannot ->recv without being set up";
342              
343 115 50 66     597 my $code = ( $RECVMETHODS{$mode} ||= $self->can( "_recv_$mode" ) )
344             or die "IO::Async::Channel cannot recv in unrecognised mode '$mode'";
345              
346 115         341 return $self->$code( @_ );
347             }
348              
349             =head2 close
350              
351             $channel->close
352              
353             Closes the channel. Causes a pending C on the other end to return undef
354             or the queued C callbacks to be invoked.
355              
356             =cut
357              
358             my %CLOSEMETHODS;
359             sub close
360             {
361 182     182 1 2505 my $self = shift;
362              
363 182 50       493 defined( my $mode = $self->{mode} ) or return;
364              
365 182 50 66     646 my $code = ( $CLOSEMETHODS{$mode} ||= $self->can( "_close_$mode" ) )
366             or die "IO::Async::Channel cannot close in unrecognised mode '$mode'";
367              
368 182         498 return $self->$code;
369             }
370              
371             # Leave this undocumented for now
372             sub setup_sync_mode
373             {
374 11     11 0 92 my $self = shift;
375 11         39 ( $self->{fh} ) = @_;
376              
377 11         45 $self->{mode} = "sync";
378              
379             # Since we're communicating binary structures and not Unicode text we need to
380             # enable binmode
381 11         44 binmode $self->{fh};
382              
383 11   66     65 defined and $_->blocking( 1 ) for $self->{read_handle}, $self->{write_handle};
384 11         113 $self->{fh}->autoflush(1);
385             }
386              
387             sub _read_exactly
388             {
389 12     12   23 $_[1] = "";
390              
391 12         31 while( length $_[1] < $_[2] ) {
392 12         419 my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
393 12 50       35 defined $n or return undef;
394 12 100       35 $n or return "";
395             }
396              
397 10         18 return $_[2];
398             }
399              
400             sub _recv_sync
401             {
402 7     7   15 my $self = shift;
403              
404 7         26 my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
405 7 50       15 defined $n or die "Cannot read - $!";
406 7 100       27 length $n or return undef;
407              
408 5         33 my $len = unpack( "I", $lenbuffer );
409              
410 5         15 $n = _read_exactly( $self->{fh}, my $record, $len );
411 5 50       11 defined $n or die "Cannot read - $!";
412 5 50       26 length $n or return undef;
413              
414 5         20 return $self->{decode}->( $record );
415             }
416              
417             sub _sendbytes_sync
418             {
419 10     10   16 my $self = shift;
420 10         20 my ( $bytes ) = @_;
421 10         33 $self->{fh}->print( $bytes );
422             }
423              
424             sub _close_sync
425             {
426 14     14   20 my $self = shift;
427 14         42 $self->{fh}->close;
428             }
429              
430             # Leave this undocumented for now
431             sub setup_async_mode
432             {
433 140     140 0 686 my $self = shift;
434 140         311 my %args = @_;
435              
436 140   66     810 exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle );
437              
438 140 50       301 keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );
439              
440 140   66     1596 defined and $_->blocking( 0 ) for $self->{read_handle}, $self->{write_handle};
441 140         614 $self->{mode} = "async";
442             }
443              
444             sub _build_stream
445             {
446 211     211   298 my $self = shift;
447 211   66     1635 return $self->{stream} ||= do {
448 128         376 $self->{on_result_queue} = [];
449              
450             my $stream = IO::Async::Stream->new(
451             read_handle => $self->{read_handle},
452             write_handle => $self->{write_handle},
453 128         577 autoflush => 1,
454             on_read => $self->_capture_weakself( '_on_stream_read' )
455             );
456              
457 128         619 $self->add_child( $stream );
458              
459 128         1153 $stream;
460             };
461             }
462              
463             sub _sendbytes_async
464             {
465 101     101   186 my $self = shift;
466 101         174 my ( $bytes ) = @_;
467 101         221 $self->_build_stream->write( $bytes );
468             }
469              
470             sub _recv_async
471             {
472 108     108   157 my $self = shift;
473 108         181 my %args = @_;
474              
475 108         175 my $on_recv = $args{on_recv};
476 108         156 my $on_eof = $args{on_eof};
477              
478 108         209 my $stream = $self->_build_stream;
479              
480 108         186 my $f;
481 108 100       431 $f = $stream->loop->new_future unless !defined wantarray;
482              
483 108         1221 push @{ $self->{on_result_queue} }, sub {
484 108     108   386 my ( $self, $type, $result ) = @_;
485 108 100       290 if( $type eq "recv" ) {
486 104 100 100     1150 $f->done( $result ) if $f and !$f->is_cancelled;
487 104 100       5612 $on_recv->( $self, $result ) if $on_recv;
488             }
489             else {
490 4 100 66     52 $f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled;
491 4 100       168 $on_eof->( $self ) if $on_eof;
492             }
493 108         221 };
494              
495 108         1915 return $f;
496             }
497              
498             sub _close_async
499             {
500 168     168   256 my $self = shift;
501 168 100       366 if( my $stream = $self->{stream} ) {
502 163         434 $stream->close_when_empty;
503             }
504             else {
505 5   66     112 $_ and $_->close for $self->{read_handle}, $self->{write_handle};
506             }
507              
508 168         3463 undef $_ for $self->{read_handle}, $self->{write_handle};
509             }
510              
511             sub _on_stream_read
512             {
513 112 50   112   522 my $self = shift or return;
514 112         282 my ( $stream, $buffref, $eof ) = @_;
515              
516 112 100       230 if( $eof ) {
517 7         56 while( my $on_result = shift @{ $self->{on_result_queue} } ) {
  11         90  
518 4         39 $on_result->( $self, eof => );
519             }
520 7 100       68 $self->{on_eof}->( $self ) if $self->{on_eof};
521 7         45 return;
522             }
523              
524 105 50       223 return 0 unless length( $$buffref ) >= 4;
525 105         306 my $len = unpack( "I", $$buffref );
526 105 50       249 return 0 unless length( $$buffref ) >= 4 + $len;
527              
528 105         465 my $record = $self->{decode}->( substr( $$buffref, 4, $len ) );
529 105         371 substr( $$buffref, 0, 4 + $len ) = "";
530              
531 105 100       140 if( my $on_result = shift @{ $self->{on_result_queue} } ) {
  105         302  
532 104         321 $on_result->( $self, recv => $record );
533             }
534             else {
535 1         3 $self->{on_recv}->( $self, $record );
536             }
537              
538 105         1841 return 1;
539             }
540              
541             sub _extract_read_handle
542             {
543 66     66   100 my $self = shift;
544              
545 66 100       286 return undef if !$self->{mode};
546              
547 1 50       22 croak "Cannot extract filehandle" if $self->{mode} ne "async";
548 1         18 $self->{mode} = "dead";
549              
550 1         11 return $self->{read_handle};
551             }
552              
553             sub _extract_write_handle
554             {
555 67     67   109 my $self = shift;
556              
557 67 50       313 return undef if !$self->{mode};
558              
559 0 0         croak "Cannot extract filehandle" if $self->{mode} ne "async";
560 0           $self->{mode} = "dead";
561              
562 0           return $self->{write_handle};
563             }
564              
565             =head1 AUTHOR
566              
567             Paul Evans
568              
569             =cut
570              
571             0x55AA;