File Coverage

blib/lib/IO/Async/Stream.pm
Criterion Covered Total %
statement 354 374 94.6
branch 156 204 76.4
condition 71 101 70.3
subroutine 55 59 93.2
pod 24 24 100.0
total 660 762 86.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2006-2020 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Stream;
7              
8 54     54   6740 use strict;
  54         196  
  54         1678  
9 54     54   278 use warnings;
  54         113  
  54         2512  
10              
11             our $VERSION = '0.801';
12              
13 54     54   393 use base qw( IO::Async::Handle );
  54         123  
  54         30689  
14              
15 54     54   2590 use Errno qw( EAGAIN EWOULDBLOCK EINTR EPIPE );
  54         6380  
  54         3540  
16              
17 54     54   347 use Carp;
  54         126  
  54         3081  
18              
19 54     54   33978 use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL );
  54         577330  
  54         4464  
20 54     54   425 use Scalar::Util qw( blessed );
  54         119  
  54         2410  
21              
22 54     54   334 use IO::Async::Debug;
  54         132  
  54         1255  
23 54     54   3187 use IO::Async::Metrics '$METRICS';
  54         124  
  54         423  
24              
25             # Tuneable from outside
26             # Not yet documented
27             our $READLEN = 8192;
28             our $WRITELEN = 8192;
29              
30 54     54   22332 use Struct::Dumb;
  54         95092  
  54         293  
31              
32             # Element of the writequeue
33             struct Writer => [qw( data writelen on_write on_flush on_error watching )];
34              
35             # Element of the readqueue
36             struct Reader => [qw( on_read future )];
37              
38             # Bitfields in the want flags
39 54     54   5413 use constant WANT_READ_FOR_READ => 0x01;
  54         145  
  54         2999  
40 54     54   328 use constant WANT_READ_FOR_WRITE => 0x02;
  54         123  
  54         2383  
41 54     54   305 use constant WANT_WRITE_FOR_READ => 0x04;
  54         123  
  54         2349  
42 54     54   330 use constant WANT_WRITE_FOR_WRITE => 0x08;
  54         121  
  54         2795  
43 54     54   331 use constant WANT_ANY_READ => WANT_READ_FOR_READ |WANT_READ_FOR_WRITE;
  54         108  
  54         3227  
44 54     54   433 use constant WANT_ANY_WRITE => WANT_WRITE_FOR_READ|WANT_WRITE_FOR_WRITE;
  54         93  
  54         230317  
45              
46             =head1 NAME
47              
48             C - event callbacks and write bufering for a stream
49             filehandle
50              
51             =head1 SYNOPSIS
52              
53             use IO::Async::Stream;
54              
55             use IO::Async::Loop;
56             my $loop = IO::Async::Loop->new;
57              
58             my $stream = IO::Async::Stream->new(
59             read_handle => \*STDIN,
60             write_handle => \*STDOUT,
61              
62             on_read => sub {
63             my ( $self, $buffref, $eof ) = @_;
64              
65             while( $$buffref =~ s/^(.*\n)// ) {
66             print "Received a line $1";
67             }
68              
69             if( $eof ) {
70             print "EOF; last partial line is $$buffref\n";
71             }
72              
73             return 0;
74             }
75             );
76              
77             $loop->add( $stream );
78              
79             $stream->write( "An initial line here\n" );
80              
81             =head1 DESCRIPTION
82              
83             This subclass of L contains a filehandle that represents
84             a byte-stream. It provides buffering for both incoming and outgoing data. It
85             invokes the C handler when new data is read from the filehandle. Data
86             may be written to the filehandle by calling the C method.
87              
88             This class is suitable for any kind of filehandle that provides a
89             possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
90             C socket (such as TCP or a byte-oriented UNIX local socket). For
91             datagram or raw message-based sockets (such as UDP) see instead
92             L.
93              
94             =cut
95              
96             =head1 EVENTS
97              
98             The following events are invoked, either using subclass methods or CODE
99             references in parameters:
100              
101             =head2 $ret = on_read \$buffer, $eof
102              
103             Invoked when more data is available in the internal receiving buffer.
104              
105             The first argument is a reference to a plain perl string. The code should
106             inspect and remove any data it likes, but is not required to remove all, or
107             indeed any of the data. Any data remaining in the buffer will be preserved for
108             the next call, the next time more data is received from the handle.
109              
110             In this way, it is easy to implement code that reads records of some form when
111             completed, but ignores partially-received records, until all the data is
112             present. If the handler wishes to be immediately invoke a second time, to have
113             another attempt at consuming more content, it should return C<1>. Otherwise,
114             it should return C<0>, and the handler will next be invoked when more data has
115             arrived from the underlying read handle and appended to the buffer. This makes
116             it easy to implement code that handles multiple incoming records at the same
117             time. Alternatively, if the handler function already attempts to consume as
118             much as possible from the buffer, it will have no need to return C<1> at all.
119             See the examples at the end of this documentation for more detail.
120              
121             The second argument is a scalar indicating whether the stream has reported an
122             end-of-file (EOF) condition. A reference to the buffer is passed to the
123             handler in the usual way, so it may inspect data contained in it. Once the
124             handler returns a false value, it will not be called again, as the handle is
125             now at EOF and no more data can arrive.
126              
127             The C code may also dynamically replace itself with a new callback
128             by returning a CODE reference instead of C<0> or C<1>. The original callback
129             or method that the object first started with may be restored by returning
130             C. Whenever the callback is changed in this way, the new code is called
131             again; even if the read buffer is currently empty. See the examples at the end
132             of this documentation for more detail.
133              
134             The C method can be used to insert new, temporary handlers that
135             take precedence over the global C handler. This event is only used if
136             there are no further pending handlers created by C.
137              
138             =head2 on_read_eof
139              
140             Optional. Invoked when the read handle indicates an end-of-file (EOF)
141             condition. If there is any data in the buffer still to be processed, the
142             C event will be invoked first, before this one.
143              
144             =head2 on_write_eof
145              
146             Optional. Invoked when the write handle indicates an end-of-file (EOF)
147             condition. Note that this condition can only be detected after a C
148             syscall returns the C error. If there is no data pending to be written
149             then it will not be detected yet.
150              
151             =head2 on_read_error $errno
152              
153             Optional. Invoked when the C method on the read handle fails.
154              
155             =head2 on_write_error $errno
156              
157             Optional. Invoked when the C method on the write handle fails.
158              
159             The C and C handlers are passed the value of
160             C<$!> at the time the error occurred. (The C<$!> variable itself, by its
161             nature, may have changed from the original error by the time this handler
162             runs so it should always use the value passed in).
163              
164             If an error occurs when the corresponding error callback is not supplied, and
165             there is not a handler for it, then the C method is called instead.
166              
167             =head2 on_read_high_watermark $length
168              
169             =head2 on_read_low_watermark $length
170              
171             Optional. Invoked when the read buffer grows larger than the high watermark
172             or smaller than the low watermark respectively. These are edge-triggered
173             events; they will only be triggered once per crossing, not continuously while
174             the buffer remains above or below the given limit.
175              
176             If these event handlers are not defined, the default behaviour is to disable
177             read-ready notifications if the read buffer grows larger than the high
178             watermark (so as to avoid it growing arbitrarily if nothing is consuming it),
179             and re-enable notifications again once something has read enough to cause it to
180             drop. If these events are overridden, the overriding code will have to perform
181             this behaviour if required, by using
182              
183             $self->want_readready_for_read(...)
184              
185             =head2 on_outgoing_empty
186              
187             Optional. Invoked when the writing data buffer becomes empty.
188              
189             =head2 on_writeable_start
190              
191             =head2 on_writeable_stop
192              
193             Optional. These two events inform when the filehandle becomes writeable, and
194             when it stops being writeable. C is invoked by the
195             C event if previously it was known to be not writeable.
196             C is invoked after a C operation fails with
197             C or C. These two events track the writeability state,
198             and ensure that only state change cause events to be invoked. A stream starts
199             off being presumed writeable, so the first of these events to be observed will
200             be C.
201              
202             =cut
203              
204             sub _init
205             {
206 712     712   2642 my $self = shift;
207              
208 712         6339 $self->{writequeue} = []; # Queue of Writers
209 712         2807 $self->{readqueue} = []; # Queue of Readers
210 712         2388 $self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN)
211 712         4083 $self->{readbuff} = "";
212              
213 712         3808 $self->{reader} = "_sysread";
214 712         2757 $self->{writer} = "_syswrite";
215              
216 712         1744 $self->{read_len} = $READLEN;
217 712         3102 $self->{write_len} = $WRITELEN;
218              
219 712         4344 $self->{want} = WANT_READ_FOR_READ;
220              
221 712         2674 $self->{close_on_read_eof} = 1;
222             }
223              
224             =head1 PARAMETERS
225              
226             The following named parameters may be passed to C or C:
227              
228             =head2 read_handle => IO
229              
230             The IO handle to read from. Must implement C and C methods.
231              
232             =head2 write_handle => IO
233              
234             The IO handle to write to. Must implement C and C methods.
235              
236             =head2 handle => IO
237              
238             Shortcut to specifying the same IO handle for both of the above.
239              
240             =head2 on_read => CODE
241              
242             =head2 on_read_error => CODE
243              
244             =head2 on_outgoing_empty => CODE
245              
246             =head2 on_write_error => CODE
247              
248             =head2 on_writeable_start => CODE
249              
250             =head2 on_writeable_stop => CODE
251              
252             CODE references for event handlers.
253              
254             =head2 autoflush => BOOL
255              
256             Optional. If true, the C method will attempt to write data to the
257             operating system immediately, without waiting for the loop to indicate the
258             filehandle is write-ready. This is useful, for example, on streams that should
259             contain up-to-date logging or console information.
260              
261             It currently defaults to false for any file handle, but future versions of
262             L may enable this by default on STDOUT and STDERR.
263              
264             =head2 read_len => INT
265              
266             Optional. Sets the buffer size for C calls. Defaults to 8 KiBytes.
267              
268             =head2 read_all => BOOL
269              
270             Optional. If true, attempt to read as much data from the kernel as possible
271             when the handle becomes readable. By default this is turned off, meaning at
272             most one fixed-size buffer is read. If there is still more data in the
273             kernel's buffer, the handle will still be readable, and will be read from
274             again.
275              
276             This behaviour allows multiple streams and sockets to be multiplexed
277             simultaneously, meaning that a large bulk transfer on one cannot starve other
278             filehandles of processing time. Turning this option on may improve bulk data
279             transfer rate, at the risk of delaying or stalling processing on other
280             filehandles.
281              
282             =head2 write_len => INT
283              
284             Optional. Sets the buffer size for C calls. Defaults to 8 KiBytes.
285              
286             =head2 write_all => BOOL
287              
288             Optional. Analogous to the C option, but for writing. When
289             C is enabled, this option only affects deferred writing if the
290             initial attempt failed due to buffer space.
291              
292             =head2 read_high_watermark => INT
293              
294             =head2 read_low_watermark => INT
295              
296             Optional. If defined, gives a way to implement flow control or other
297             behaviours that depend on the size of Stream's read buffer.
298              
299             If after more data is read from the underlying filehandle the read buffer is
300             now larger than the high watermark, the C event is
301             triggered (which, by default, will disable read-ready notifications and pause
302             reading from the filehandle).
303              
304             If after data is consumed by an C handler the read buffer is now
305             smaller than the low watermark, the C event is
306             triggered (which, by default, will re-enable read-ready notifications and
307             resume reading from the filehandle). For to be possible, the read handler
308             would have to be one added by the C method or one of the
309             Future-returning C methods.
310              
311             By default these options are not defined, so this behaviour will not happen.
312             C may not be set to a larger value than
313             C, but it may be set to a smaller value, creating a
314             hysteresis region. If either option is defined then both must be.
315              
316             If these options are used with the default event handlers, be careful not to
317             cause deadlocks by having a high watermark sufficiently low that a single
318             C invocation might not consider it finished yet.
319              
320             =head2 reader => STRING|CODE
321              
322             =head2 writer => STRING|CODE
323              
324             Optional. If defined, gives the name of a method or a CODE reference to use
325             to implement the actual reading from or writing to the filehandle. These will
326             be invoked as
327              
328             $stream->reader( $read_handle, $buffer, $len )
329             $stream->writer( $write_handle, $buffer, $len )
330              
331             Each is expected to modify the passed buffer; C by appending to it,
332             C by removing a prefix from it. Each is expected to return a true
333             value on success, zero on EOF, or C with C<$!> set for errors. If not
334             provided, they will be substituted by implenentations using C and
335             C on the underlying handle, respectively.
336              
337             =head2 close_on_read_eof => BOOL
338              
339             Optional. Usually true, but if set to a false value then the stream will not
340             be Cd when an EOF condition occurs on read. This is normally not useful
341             as at that point the underlying stream filehandle is no longer useable, but it
342             may be useful for reading regular files, or interacting with TTY devices.
343              
344             =head2 encoding => STRING
345              
346             If supplied, sets the name of encoding of the underlying stream. If an
347             encoding is set, then the C method will expect to receive Unicode
348             strings and encodes them into bytes, and incoming bytes will be decoded into
349             Unicode strings for the C event.
350              
351             If an encoding is not supplied then C and C will work in byte
352             strings.
353              
354             I in order to handle reads of UTF-8 content or other
355             multibyte encodings, the code implementing the C event uses a feature
356             of L; the C flag. While this flag has existed for a
357             while and is used by the C<:encoding> PerlIO layer itself for similar
358             purposes, the flag is not officially documented by the C module. In
359             principle this undocumented feature could be subject to change, in practice I
360             believe it to be reasonably stable.
361              
362             This note applies only to the C event; data written using the
363             C method does not rely on any undocumented features of C.
364              
365             If a read handle is given, it is required that either an C callback
366             reference is configured, or that the object provides an C method. It
367             is optional whether either is true for C; if neither is
368             supplied then no action will be taken when the writing buffer becomes empty.
369              
370             An C handler may be supplied even if no read handle is yet given, to
371             be used when a read handle is eventually provided by the C
372             method.
373              
374             This condition is checked at the time the object is added to a Loop; it is
375             allowed to create a C object with a read handle but without
376             a C handler, provided that one is later given using C
377             before the stream is added to its containing Loop, either directly or by being
378             a child of another Notifier already in a Loop, or added to one.
379              
380             =cut
381              
382             sub configure
383             {
384 993     993 1 5671 my $self = shift;
385 993         6233 my %params = @_;
386              
387 993         4149 for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error
388             on_write_error on_writeable_start on_writeable_stop autoflush
389             read_len read_all write_len write_all on_read_high_watermark
390             on_read_low_watermark reader writer close_on_read_eof )) {
391 17874 100       35059 $self->{$_} = delete $params{$_} if exists $params{$_};
392             }
393              
394 993 100 66     8914 if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) {
395 1         3 my $high = delete $params{read_high_watermark};
396 1 50       4 defined $high or $high = $self->{read_high_watermark};
397              
398 1         3 my $low = delete $params{read_low_watermark};
399 1 50       3 defined $low or $low = $self->{read_low_watermark};
400              
401 1 50 33     7 croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high;
402 1 50 33     5 croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low;
403              
404 1 50 33     8 croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high;
      33        
405              
406 1         2 $self->{read_high_watermark} = $high;
407 1         3 $self->{read_low_watermark} = $low;
408              
409             # TODO: reassert levels if we've moved them
410             }
411              
412 993 100       2463 if( exists $params{encoding} ) {
413 2         4 my $encoding = delete $params{encoding};
414 2         9 my $obj = find_encoding( $encoding );
415 2 50       226 defined $obj or croak "Cannot handle an encoding of '$encoding'";
416 2         49 $self->{encoding} = $obj;
417             }
418              
419 993         13575 $self->SUPER::configure( %params );
420              
421 993 100 100     2901 if( $self->loop and $self->read_handle ) {
422 5 50       24 $self->can_event( "on_read" ) or
423             croak 'Expected either an on_read callback or to be able to ->on_read';
424             }
425              
426 993 100 100     5913 if( $self->{autoflush} and my $write_handle = $self->write_handle ) {
427 62 50       841 carp "An IO::Async::Stream with autoflush needs an O_NONBLOCK write handle"
428             if $write_handle->blocking;
429             }
430             }
431              
432             sub _add_to_loop
433             {
434 703     703   1527 my $self = shift;
435              
436 703 100       2031 if( defined $self->read_handle ) {
437 592 100       1749 $self->can_event( "on_read" ) or
438             croak 'Expected either an on_read callback or to be able to ->on_read';
439             }
440              
441 702         7116 $self->SUPER::_add_to_loop( @_ );
442              
443 702 100       9096 if( !$self->_is_empty ) {
444 39         321 $self->want_writeready_for_write( 1 );
445             }
446             }
447              
448             =head1 METHODS
449              
450             The following methods documented with a trailing call to C<< ->get >> return
451             L instances.
452              
453             =cut
454              
455             =head2 want_readready_for_read
456              
457             =head2 want_readready_for_write
458              
459             $stream->want_readready_for_read( $set )
460              
461             $stream->want_readready_for_write( $set )
462              
463             Mutators for the C property on L, which
464             control whether the C or C behaviour should be continued once the
465             filehandle becomes ready for read.
466              
467             Normally, C is always true (though the read watermark
468             behaviour can modify it), and C is not used.
469             However, if a custom C function is provided, it may find this useful
470             for being invoked again if it cannot proceed with a write operation until the
471             filehandle becomes readable (such as during transport negotiation or SSL key
472             management, for example).
473              
474             =cut
475              
476             sub want_readready_for_read
477             {
478 0     0 1 0 my $self = shift;
479 0         0 my ( $set ) = @_;
480 0 0       0 $set ? ( $self->{want} |= WANT_READ_FOR_READ ) : ( $self->{want} &= ~WANT_READ_FOR_READ );
481              
482 0 0       0 $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
483             }
484              
485             sub want_readready_for_write
486             {
487 1     1 1 356 my $self = shift;
488 1         3 my ( $set ) = @_;
489 1 50       5 $set ? ( $self->{want} |= WANT_READ_FOR_WRITE ) : ( $self->{want} &= ~WANT_READ_FOR_WRITE );
490              
491 1 50       3 $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
492             }
493              
494             =head2 want_writeready_for_read
495              
496             =head2 want_writeready_for_write
497              
498             $stream->want_writeready_for_write( $set )
499              
500             $stream->want_writeready_for_read( $set )
501              
502             Mutators for the C property on L, which
503             control whether the C or C behaviour should be continued once the
504             filehandle becomes ready for write.
505              
506             Normally, C is managed by the C method and
507             associated flushing, and C is not used. However, if
508             a custom C function is provided, it may find this useful for being
509             invoked again if it cannot proceed with a read operation until the filehandle
510             becomes writable (such as during transport negotiation or SSL key management,
511             for example).
512              
513             =cut
514              
515             sub want_writeready_for_write
516             {
517 223     223 1 532 my $self = shift;
518 223         442 my ( $set ) = @_;
519 223 100       853 $set ? ( $self->{want} |= WANT_WRITE_FOR_WRITE ) : ( $self->{want} &= ~WANT_WRITE_FOR_WRITE );
520              
521 223 100       706 $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle;
522             }
523              
524             sub want_writeready_for_read
525             {
526 1     1 1 598 my $self = shift;
527 1         4 my ( $set ) = @_;
528 1 50       5 $set ? ( $self->{want} |= WANT_WRITE_FOR_READ ) : ( $self->{want} &= ~WANT_WRITE_FOR_READ );
529              
530 1 50       8 $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle;
531             }
532              
533             # FUNCTION not method
534             sub _nonfatal_error
535             {
536 9     9   22 my ( $errno ) = @_;
537              
538 9   66     80 return $errno == EAGAIN ||
539             $errno == EWOULDBLOCK ||
540             $errno == EINTR;
541             }
542              
543             sub _is_empty
544             {
545 1327     1327   2387 my $self = shift;
546 1327         2271 return !@{ $self->{writequeue} };
  1327         25345  
547             }
548              
549             =head2 close
550              
551             $stream->close
552              
553             A synonym for C. This should not be used when the deferred
554             wait behaviour is required, as the behaviour of C may change in a
555             future version of L. Instead, call C directly.
556              
557             =cut
558              
559             sub close
560             {
561 8     8 1 1101 my $self = shift;
562 8         46 $self->close_when_empty;
563             }
564              
565             =head2 close_when_empty
566              
567             $stream->close_when_empty
568              
569             If the write buffer is empty, this method calls C on the underlying IO
570             handles, and removes the stream from its containing loop. If the write buffer
571             still contains data, then this is deferred until the buffer is empty. This is
572             intended for "write-then-close" one-shot streams.
573              
574             $stream->write( "Here is my final data\n" );
575             $stream->close_when_empty;
576              
577             Because of this deferred nature, it may not be suitable for error handling.
578             See instead the C method.
579              
580             =cut
581              
582             sub close_when_empty
583             {
584 173     173 1 1963 my $self = shift;
585              
586 173 100       358 return $self->SUPER::close if $self->_is_empty;
587              
588 8         87 $self->{stream_closing} = 1;
589             }
590              
591             =head2 close_now
592              
593             $stream->close_now
594              
595             This method immediately closes the underlying IO handles and removes the
596             stream from the containing loop. It will not wait to flush the remaining data
597             in the write buffer.
598              
599             =cut
600              
601             sub close_now
602             {
603 478     478 1 920 my $self = shift;
604              
605 478         1031 foreach ( @{ $self->{writequeue} } ) {
  478         1716  
606 2 100       8 $_->on_error->( $self, "stream closing" ) if $_->on_error;
607             }
608              
609 478         793 undef @{ $self->{writequeue} };
  478         1453  
610 478         2202 undef $self->{stream_closing};
611              
612 478         2820 $self->SUPER::close;
613             }
614              
615             =head2 is_read_eof
616              
617             =head2 is_write_eof
618              
619             $eof = $stream->is_read_eof
620              
621             $eof = $stream->is_write_eof
622              
623             Returns true after an EOF condition is reported on either the read or the
624             write handle, respectively.
625              
626             =cut
627              
628             sub is_read_eof
629             {
630 2     2 1 50 my $self = shift;
631 2         10 return $self->{read_eof};
632             }
633              
634             sub is_write_eof
635             {
636 2     2 1 38 my $self = shift;
637 2         9 return $self->{write_eof};
638             }
639              
640             =head2 write
641              
642             $stream->write( $data, %params )
643              
644             This method adds data to the outgoing data queue, or writes it immediately,
645             according to the C parameter.
646              
647             If the C option is set, this method will try immediately to write
648             the data to the underlying filehandle. If this completes successfully then it
649             will have been written by the time this method returns. If it fails to write
650             completely, then the data is queued as if C were not set, and will
651             be flushed as normal.
652              
653             C<$data> can either be a plain string, a L, or a CODE reference. If it
654             is a plain string it is written immediately. If it is not, its value will be
655             used to generate more C<$data> values, eventually leading to strings to be
656             written.
657              
658             If C<$data> is a C, the Stream will wait until it is ready, and take
659             the single value it yields.
660              
661             If C<$data> is a CODE reference, it will be repeatedly invoked to generate new
662             values. Each time the filehandle is ready to write more data to it, the
663             function is invoked. Once the function has finished generating data it should
664             return undef. The function is passed the Stream object as its first argument.
665              
666             It is allowed that Cs yield CODE references, or CODE references return
667             Cs, as well as plain strings.
668              
669             For example, to stream the contents of an existing opened filehandle:
670              
671             open my $fileh, "<", $path or die "Cannot open $path - $!";
672              
673             $stream->write( sub {
674             my ( $stream ) = @_;
675              
676             sysread $fileh, my $buffer, 8192 or return;
677             return $buffer;
678             } );
679              
680             Takes the following optional named parameters in C<%params>:
681              
682             =over 8
683              
684             =item write_len => INT
685              
686             Overrides the C parameter for the data written by this call.
687              
688             =item on_write => CODE
689              
690             A CODE reference which will be invoked after every successful C
691             operation on the underlying filehandle. It will be passed the number of bytes
692             that were written by this call, which may not be the entire length of the
693             buffer - if it takes more than one C operation to empty the buffer
694             then this callback will be invoked multiple times.
695              
696             $on_write->( $stream, $len )
697              
698             =item on_flush => CODE
699              
700             A CODE reference which will be invoked once the data queued by this C
701             call has been flushed. This will be invoked even if the buffer itself is not
702             yet empty; if more data has been queued since the call.
703              
704             $on_flush->( $stream )
705              
706             =item on_error => CODE
707              
708             A CODE reference which will be invoked if a C error happens while
709             performing this write. Invoked as for the C's C event.
710              
711             $on_error->( $stream, $errno )
712              
713             =back
714              
715             If the object is not yet a member of a loop and doesn't yet have a
716             C, then calls to the C method will simply queue the data
717             and return. It will be flushed when the object is added to the loop.
718              
719             If C<$data> is a defined but empty string, the write is still queued, and the
720             C continuation will be invoked, if supplied. This can be used to
721             obtain a marker, to invoke some code once the output queue has been flushed up
722             to this point.
723              
724             =head2 write (scalar)
725              
726             $stream->write( ... )->get
727              
728             If called in non-void context, this method returns a L which will
729             complete (with no value) when the write operation has been flushed. This may
730             be used as an alternative to, or combined with, the C callback.
731              
732             =cut
733              
734             sub _syswrite
735             {
736 166     166   1777 my $self = shift;
737 166         443 my ( $handle, undef, $len ) = @_;
738              
739 166         2183 my $written = $handle->syswrite( $_[1], $len );
740 166 100       6971 return $written if !$written; # zero or undef
741              
742 158         579 substr( $_[1], 0, $written ) = "";
743 158         421 return $written;
744             }
745              
746             sub _flush_one_write
747             {
748 177     177   422 my $self = shift;
749              
750 177         368 my $writequeue = $self->{writequeue};
751              
752 177         362 my $head;
753 177   66     2654 while( $head = $writequeue->[0] and ref $head->data ) {
754 18 100 33     213 if( ref $head->data eq "CODE" ) {
    50          
755 12         73 my $data = $head->data->( $self );
756 12 100       2438 if( !defined $data ) {
757 5 50       17 $head->on_flush->( $self ) if $head->on_flush;
758 5         600 shift @$writequeue;
759 5         55 return 1;
760             }
761 7 100 100     38 if( !ref $data and my $encoding = $self->{encoding} ) {
762 1         7 $data = $encoding->encode( $data );
763             }
764 7         21 unshift @$writequeue, my $new = Writer(
765             $data, $head->writelen, $head->on_write, undef, undef, 0
766             );
767 7         101 next;
768             }
769             elsif( blessed $head->data and $head->data->isa( "Future" ) ) {
770 6         125 my $f = $head->data;
771 6 100       37 if( !$f->is_ready ) {
772 2 50       12 return 0 if $head->watching;
773 2     2   23 $f->on_ready( sub { $self->_flush_one_write } );
  2         721  
774 2         61 $head->watching++;
775 2         15 return 0;
776             }
777 4         36 my $data = $f->get;
778 4 100 100     81 if( !ref $data and my $encoding = $self->{encoding} ) {
779 1         6 $data = $encoding->encode( $data );
780             }
781 4         10 $head->data = $data;
782 4         32 next;
783             }
784             else {
785 0         0 die "Unsure what to do with reference ".ref($head->data)." in write queue";
786             }
787             }
788              
789 170         3942 my $second;
790 170   100     816 while( $second = $writequeue->[1] and
      66        
      66        
      33        
      33        
791             !ref $second->data and
792             $head->writelen == $second->writelen and
793             !$head->on_write and !$second->on_write and
794             !$head->on_flush ) {
795 1         41 $head->data .= $second->data;
796 1         10 $head->on_write = $second->on_write;
797 1         8 $head->on_flush = $second->on_flush;
798 1         11 splice @$writequeue, 1, 1, ();
799             }
800              
801 170 50       688 die "TODO: head data does not contain a plain string" if ref $head->data;
802              
803 170 50       1175 if( $IO::Async::Debug::DEBUG > 1 ) {
804 0         0 my $data = substr $head->data, 0, $head->writelen;
805 0         0 $self->debug_printf( "WRITE len=%d", length $data );
806 0 0       0 IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw};
807             }
808              
809 170         655 my $writer = $self->{writer};
810 170         730 my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen );
811              
812 170 100       679 if( !defined $len ) {
813 3         17 my $errno = $!;
814              
815 3 100 66     95 if( $errno == EAGAIN or $errno == EWOULDBLOCK ) {
816 1 50       14 $self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable};
817 1         6 $self->{writeable} = 0;
818             }
819              
820 3 100       9 return 0 if _nonfatal_error( $errno );
821              
822 2 50       9 $self->debug_printf( "WRITE err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1;
823              
824 2 100       6 if( $errno == EPIPE ) {
825 1         17 $self->debug_printf( "WRITE-EOF" );
826 1         3 $self->{write_eof} = 1;
827 1         3 $self->maybe_invoke_event( on_write_eof => );
828             }
829              
830 2 50       10 $head->on_error->( $self, $errno ) if $head->on_error;
831 2 100       7 $self->maybe_invoke_event( on_write_error => $errno )
832             or $self->close_now;
833              
834 2         20 return 0;
835             }
836              
837 167 100 66     944 $METRICS and $METRICS->inc_counter_by( stream_written => $len ) if $len;
838              
839 167 100       4200 if( my $on_write = $head->on_write ) {
840 3         22 $on_write->( $self, $len );
841             }
842              
843 167 100       2392 if( !length $head->data ) {
844 157 100       1184 $head->on_flush->( $self ) if $head->on_flush;
845 157         1284 shift @{ $self->{writequeue} };
  157         404  
846             }
847              
848 167         1736 return 1;
849             }
850              
851             sub write
852             {
853 169     169 1 6722 my $self = shift;
854 169         732 my ( $data, %params ) = @_;
855              
856 169 50 0     673 carp "Cannot write data to a Stream that is closing" and return if $self->{stream_closing};
857              
858             # Allow writes without a filehandle if we're not yet in a Loop, just don't
859             # try to flush them
860 169         614 my $handle = $self->write_handle;
861              
862 169 100 100     952 croak "Cannot write data to a Stream with no write_handle" if !$handle and $self->loop;
863              
864 168 100 100     1754 if( !ref $data and my $encoding = $self->{encoding} ) {
865 2         13 $data = $encoding->encode( $data );
866             }
867              
868 168         464 my $on_write = delete $params{on_write};
869 168         451 my $on_flush = delete $params{on_flush};
870 168         376 my $on_error = delete $params{on_error};
871              
872 168         344 my $f;
873 168 100       491 if( defined wantarray ) {
874 3         5 my $orig_on_flush = $on_flush;
875 3         5 my $orig_on_error = $on_error;
876              
877 3 50       10 my $loop = $self->loop or
878             croak "Cannot ->write data returning a Future to a Stream not in a Loop";
879 3         16 $f = $loop->new_future;
880             $on_flush = sub {
881 1     1   17 $f->done;
882 1 50       54 $orig_on_flush->( @_ ) if $orig_on_flush;
883 3         15 };
884             $on_error = sub {
885 3     3   33 my $self = shift;
886 3         6 my ( $errno ) = @_;
887              
888 3 100       12 $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready;
889              
890 3 50       130 $orig_on_error->( $self, @_ ) if $orig_on_error;
891 3         14 };
892             }
893              
894 168         350 my $write_len = $params{write_len};
895 168 50       561 defined $write_len or $write_len = $self->{write_len};
896              
897 168         286 push @{ $self->{writequeue} }, Writer(
  168         1617  
898             $data, $write_len, $on_write, $on_flush, $on_error, 0
899             );
900              
901 168 50       3408 keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params );
902              
903 168 100       600 return $f unless $handle;
904              
905 132 100       316 if( $self->{autoflush} ) {
906 104   66     445 1 while !$self->_is_empty and $self->_flush_one_write;
907              
908 104 50       214 if( $self->_is_empty ) {
909 104         472 $self->want_writeready_for_write( 0 );
910 104         491 return $f;
911             }
912             }
913              
914 28         87 $self->want_writeready_for_write( 1 );
915 28         77 return $f;
916             }
917              
918             sub on_write_ready
919             {
920 68     68 1 285 my $self = shift;
921              
922 68 100       510 if( !$self->{writeable} ) {
923 1         5 $self->maybe_invoke_event( on_writeable_start => );
924 1         5 $self->{writeable} = 1;
925             }
926              
927 68 100       665 $self->_do_write if $self->{want} & WANT_WRITE_FOR_WRITE;
928 68 100       263 $self->_do_read if $self->{want} & WANT_WRITE_FOR_READ;
929             }
930              
931             sub _do_write
932             {
933 68     68   261 my $self = shift;
934              
935 68   100     356 1 while !$self->_is_empty and $self->_flush_one_write and $self->{write_all};
      100        
936              
937             # All data successfully flushed
938 68 100       254 if( $self->_is_empty ) {
939 51         249 $self->want_writeready_for_write( 0 );
940              
941 51         387 $self->maybe_invoke_event( on_outgoing_empty => );
942              
943 51 100       215 $self->close_now if $self->{stream_closing};
944             }
945             }
946              
947             sub _flush_one_read
948             {
949 1152     1152   2131 my $self = shift;
950 1152         2625 my ( $eof ) = @_;
951              
952 1152         4366 local $self->{flushing_read} = 1;
953              
954 1152         2183 my $readqueue = $self->{readqueue};
955              
956 1152         1621 my $ret;
957 1152 100 66     3715 if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) {
958 17         205 $ret = $on_read->( $self, \$self->{readbuff}, $eof );
959             }
960             else {
961 1135         5958 $ret = $self->invoke_event( on_read => \$self->{readbuff}, $eof );
962             }
963              
964 1152 100 100     5645 if( defined $self->{read_low_watermark} and $self->{at_read_high_watermark} and
      66        
965             length $self->{readbuff} < $self->{read_low_watermark} ) {
966 1         2 undef $self->{at_read_high_watermark};
967 1         4 $self->invoke_event( on_read_low_watermark => length $self->{readbuff} );
968             }
969              
970 1152 100 100     4426 if( ref $ret eq "CODE" ) {
    100          
971             # Replace the top CODE, or add it if there was none
972 1         5 $readqueue->[0] = Reader( $ret, undef );
973 1         15 return 1;
974             }
975             elsif( @$readqueue and !defined $ret ) {
976 13         23 shift @$readqueue;
977 13         103 return 1;
978             }
979             else {
980 1138   100     8965 return $ret && ( length( $self->{readbuff} ) > 0 || $eof );
981             }
982             }
983              
984             sub _sysread
985             {
986 943     943   2296 my $self = shift;
987 943         2556 my ( $handle, undef, $len ) = @_;
988 943         6603 return $handle->sysread( $_[1], $len );
989             }
990              
991             sub on_read_ready
992             {
993 942     942 1 2338 my $self = shift;
994              
995 942 50       6361 $self->_do_read if $self->{want} & WANT_READ_FOR_READ;
996 942 100       10674 $self->_do_write if $self->{want} & WANT_READ_FOR_WRITE;
997             }
998              
999             sub _do_read
1000             {
1001 943     943   1714 my $self = shift;
1002              
1003 943         3432 my $handle = $self->read_handle;
1004 943         2293 my $reader = $self->{reader};
1005              
1006 943         1558 while(1) {
1007 947         1378 my $data;
1008 947         4213 my $len = $self->$reader( $handle, $data, $self->{read_len} );
1009              
1010 947 100       19455 if( !defined $len ) {
1011 6         53 my $errno = $!;
1012              
1013 6 100       19 return if _nonfatal_error( $errno );
1014              
1015 4 50       14 $self->debug_printf( "READ err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1;
1016              
1017 4 100       10 $self->maybe_invoke_event( on_read_error => $errno )
1018             or $self->close_now;
1019              
1020 4         22 foreach ( @{ $self->{readqueue} } ) {
  4         9  
1021 1 50       6 $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future;
1022             }
1023 4         75 undef @{ $self->{readqueue} };
  4         10  
1024              
1025 4         10 return;
1026             }
1027              
1028 941 50       2685 if( $IO::Async::Debug::DEBUG > 1 ) {
1029 0         0 $self->debug_printf( "READ len=%d", $len );
1030 0 0       0 IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sr};
1031             }
1032              
1033 941 100 66     3564 $METRICS and $METRICS->inc_counter_by( stream_read => $len ) if $len;
1034              
1035 941         10872 my $eof = $self->{read_eof} = ( $len == 0 );
1036              
1037 941 100       2854 if( my $encoding = $self->{encoding} ) {
1038 4 100       14 my $bytes = defined $self->{bytes_remaining} ? $self->{bytes_remaining} . $data : $data;
1039 4         32 $data = $encoding->decode( $bytes, STOP_AT_PARTIAL );
1040 4         29 $self->{bytes_remaining} = $bytes;
1041             }
1042              
1043 941 100       2967 $self->{readbuff} .= $data if !$eof;
1044              
1045 941         3703 1 while $self->_flush_one_read( $eof );
1046              
1047 941 100       2600 if( $eof ) {
1048 476         2797 $self->debug_printf( "READ-EOF" );
1049 476         2028 $self->maybe_invoke_event( on_read_eof => );
1050 476 100       3412 $self->close_now if $self->{close_on_read_eof};
1051 476         1184 foreach ( @{ $self->{readqueue} } ) {
  476         1381  
1052 0 0       0 $_->future->done( undef ) if $_->future;
1053             }
1054 476         774 undef @{ $self->{readqueue} };
  476         1092  
1055 476         1272 return;
1056             }
1057              
1058 465 100       1815 last unless $self->{read_all};
1059             }
1060              
1061 461 100 66     1667 if( defined $self->{read_high_watermark} and length $self->{readbuff} >= $self->{read_high_watermark} ) {
1062             $self->{at_read_high_watermark} or
1063 1 50       6 $self->invoke_event( on_read_high_watermark => length $self->{readbuff} );
1064              
1065 1         4 $self->{at_read_high_watermark} = 1;
1066             }
1067             }
1068              
1069             sub on_read_high_watermark
1070             {
1071 0     0 1 0 my $self = shift;
1072 0         0 $self->want_readready_for_read( 0 );
1073             }
1074              
1075             sub on_read_low_watermark
1076             {
1077 0     0 1 0 my $self = shift;
1078 0         0 $self->want_readready_for_read( 1 );
1079             }
1080              
1081             =head2 push_on_read
1082              
1083             $stream->push_on_read( $on_read )
1084              
1085             Pushes a new temporary C handler to the end of the queue. This queue,
1086             if non-empty, is used to provide C event handling code in preference
1087             to using the object's main event handler or method. New handlers can be
1088             supplied at any time, and they will be used in first-in first-out (FIFO)
1089             order.
1090              
1091             As with the main C event handler, each can return a (defined) boolean
1092             to indicate if they wish to be invoked again or not, another C reference
1093             to replace themself with, or C to indicate it is now complete and
1094             should be removed. When a temporary handler returns C it is shifted
1095             from the queue and the next one, if present, is invoked instead. If there are
1096             no more then the object's main handler is invoked instead.
1097              
1098             =cut
1099              
1100             sub push_on_read
1101             {
1102 13     13 1 44 my $self = shift;
1103 13         31 my ( $on_read, %args ) = @_;
1104             # %args undocumented for internal use
1105              
1106 13         18 push @{ $self->{readqueue} }, Reader( $on_read, $args{future} );
  13         48  
1107              
1108             # TODO: Should this always defer?
1109 13 100       102 return if $self->{flushing_read};
1110 12   100     46 1 while length $self->{readbuff} and $self->_flush_one_read( 0 );
1111             }
1112              
1113             =head1 FUTURE-RETURNING READ METHODS
1114              
1115             The following methods all return a L which will become ready when
1116             enough data has been read by the Stream into its buffer. At this point, the
1117             data is removed from the buffer and given to the C object to complete
1118             it.
1119              
1120             my $f = $stream->read_...
1121              
1122             my ( $string ) = $f->get;
1123              
1124             Unlike the C event handlers, these methods don't allow for access to
1125             "partial" results; they only provide the final result once it is ready.
1126              
1127             If a C is cancelled before it completes it is removed from the read
1128             queue without consuming any data; i.e. each C atomically either
1129             completes or is cancelled.
1130              
1131             Since it is possible to use a readable C entirely using these
1132             C-returning methods instead of the C event, it may be useful
1133             to configure a trivial return-false event handler to keep it from consuming
1134             any input, and to allow it to be added to a C in the first place.
1135              
1136             my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... );
1137             $loop->add( $stream );
1138              
1139             my $f = $stream->read_...
1140              
1141             If a read EOF or error condition happens while there are read Cs
1142             pending, they are all completed. In the case of a read EOF, they are done with
1143             C; in the case of a read error they are failed using the C<$!> error
1144             value as the failure.
1145              
1146             $f->fail( $message, sysread => $! )
1147              
1148             If a read EOF condition happens to the currently-processing read C, it
1149             will return a partial result. The calling code can detect this by the fact
1150             that the returned data is not complete according to the specification (too
1151             short in C's case, or lacking the ending pattern in
1152             C's case). Additionally, each C will yield the C<$eof>
1153             value in its results.
1154              
1155             my ( $string, $eof ) = $f->get;
1156              
1157             =cut
1158              
1159             sub _read_future
1160             {
1161 11     11   17 my $self = shift;
1162 11         29 my $f = $self->loop->new_future;
1163             $f->on_cancel( $self->_capture_weakself( sub {
1164 1 50   1   4 my $self = shift or return;
1165 1         4 1 while $self->_flush_one_read;
1166 11         50 }));
1167 11         214 return $f;
1168             }
1169              
1170             =head2 read_atmost
1171              
1172             =head2 read_exactly
1173              
1174             ( $string, $eof ) = $stream->read_atmost( $len )->get
1175              
1176             ( $string, $eof ) = $stream->read_exactly( $len )->get
1177              
1178             Completes the C when the read buffer contains C<$len> or more
1179             characters of input. C will also complete after the first
1180             invocation of C, even if fewer characters are available, whereas
1181             C will wait until at least C<$len> are available.
1182              
1183             =cut
1184              
1185             sub read_atmost
1186             {
1187 2     2 1 579 my $self = shift;
1188 2         5 my ( $len ) = @_;
1189              
1190 2         6 my $f = $self->_read_future;
1191             $self->push_on_read( sub {
1192 1     1   4 my ( undef, $buffref, $eof ) = @_;
1193 1 50       12 return undef if $f->is_cancelled;
1194 1         17 $f->done( substr( $$buffref, 0, $len, "" ), $eof );
1195 1         54 return undef;
1196 2         12 }, future => $f );
1197 2         8 return $f;
1198             }
1199              
1200             sub read_exactly
1201             {
1202 4     4 1 1132 my $self = shift;
1203 4         11 my ( $len ) = @_;
1204              
1205 4         8 my $f = $self->_read_future;
1206             $self->push_on_read( sub {
1207 4     4   11 my ( undef, $buffref, $eof ) = @_;
1208 4 50       12 return undef if $f->is_cancelled;
1209 4 50 33     34 return 0 unless $eof or length $$buffref >= $len;
1210 4         20 $f->done( substr( $$buffref, 0, $len, "" ), $eof );
1211 4         240 return undef;
1212 4         23 }, future => $f );
1213 4         36 return $f;
1214             }
1215              
1216             =head2 read_until
1217              
1218             ( $string, $eof ) = $stream->read_until( $end )->get
1219              
1220             Completes the C when the read buffer contains a match for C<$end>,
1221             which may either be a plain string or a compiled C reference. Yields
1222             the prefix of the buffer up to and including this match.
1223              
1224             =cut
1225              
1226             sub read_until
1227             {
1228 4     4 1 1795 my $self = shift;
1229 4         10 my ( $until ) = @_;
1230              
1231 4 100       35 ref $until or $until = qr/\Q$until\E/;
1232              
1233 4         11 my $f = $self->_read_future;
1234             $self->push_on_read( sub {
1235 5     5   12 my ( undef, $buffref, $eof ) = @_;
1236 5 100       12 return undef if $f->is_cancelled;
1237 4 100       51 if( $$buffref =~ $until ) {
    50          
1238 3         23 $f->done( substr( $$buffref, 0, $+[0], "" ), $eof );
1239 3         114 return undef;
1240             }
1241             elsif( $eof ) {
1242 0         0 $f->done( $$buffref, $eof ); $$buffref = "";
  0         0  
1243 0         0 return undef;
1244             }
1245             else {
1246 1         4 return 0;
1247             }
1248 4         20 }, future => $f );
1249 4         9 return $f;
1250             }
1251              
1252             =head2 read_until_eof
1253              
1254             ( $string, $eof ) = $stream->read_until_eof->get
1255              
1256             Completes the C when the stream is eventually closed at EOF, and
1257             yields all of the data that was available.
1258              
1259             =cut
1260              
1261             sub read_until_eof
1262             {
1263 1     1 1 617 my $self = shift;
1264              
1265 1         4 my $f = $self->_read_future;
1266             $self->push_on_read( sub {
1267 2     2   5 my ( undef, $buffref, $eof ) = @_;
1268 2 50       8 return undef if $f->is_cancelled;
1269 2 100       15 return 0 unless $eof;
1270 1         5 $f->done( $$buffref, $eof ); $$buffref = "";
  1         41  
1271 1         3 return undef;
1272 1         8 }, future => $f );
1273 1         3 return $f;
1274             }
1275              
1276             =head1 UTILITY CONSTRUCTORS
1277              
1278             =cut
1279              
1280             =head2 new_for_stdin
1281              
1282             =head2 new_for_stdout
1283              
1284             =head2 new_for_stdio
1285              
1286             $stream = IO::Async::Stream->new_for_stdin
1287              
1288             $stream = IO::Async::Stream->new_for_stdout
1289              
1290             $stream = IO::Async::Stream->new_for_stdio
1291              
1292             Return a C object preconfigured with the correct
1293             C, C or both.
1294              
1295             =cut
1296              
1297 1     1 1 17 sub new_for_stdin { shift->new( read_handle => \*STDIN, @_ ) }
1298 1     1 1 15 sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) }
1299              
1300 1     1 1 340 sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) }
1301              
1302             =head2 connect
1303              
1304             $future = $stream->connect( %args )
1305              
1306             A convenient wrapper for calling the C method on the underlying
1307             L object, passing the C hint as C if not
1308             otherwise supplied.
1309              
1310             =cut
1311              
1312             sub connect
1313             {
1314 0     0 1   my $self = shift;
1315 0           return $self->SUPER::connect( socktype => "stream", @_ );
1316             }
1317              
1318             =head1 DEBUGGING FLAGS
1319              
1320             The following flags in C enable extra logging:
1321              
1322             =over 4
1323              
1324             =item C
1325              
1326             Log byte buffers as data is read from a Stream
1327              
1328             =item C
1329              
1330             Log byte buffers as data is written to a Stream
1331              
1332             =back
1333              
1334             =cut
1335              
1336             =head1 EXAMPLES
1337              
1338             =head2 A line-based C method
1339              
1340             The following C method accepts incoming C<\n>-terminated lines and
1341             prints them to the program's C stream.
1342              
1343             sub on_read
1344             {
1345             my $self = shift;
1346             my ( $buffref, $eof ) = @_;
1347              
1348             while( $$buffref =~ s/^(.*\n)// ) {
1349             print "Received a line: $1";
1350             }
1351              
1352             return 0;
1353             }
1354              
1355             Because a reference to the buffer itself is passed, it is simple to use a
1356             C regular expression on the scalar it points at, to both check if data
1357             is ready (i.e. a whole line), and to remove it from the buffer. Since it
1358             always removes as many complete lines as possible, it doesn't need invoking
1359             again when it has finished, so it can return a constant C<0>.
1360              
1361             =head2 Reading binary data
1362              
1363             This C method accepts incoming records in 16-byte chunks, printing
1364             each one.
1365              
1366             sub on_read
1367             {
1368             my ( $self, $buffref, $eof ) = @_;
1369              
1370             if( length $$buffref >= 16 ) {
1371             my $record = substr( $$buffref, 0, 16, "" );
1372             print "Received a 16-byte record: $record\n";
1373              
1374             return 1;
1375             }
1376              
1377             if( $eof and length $$buffref ) {
1378             print "EOF: a partial record still exists\n";
1379             }
1380              
1381             return 0;
1382             }
1383              
1384             This time, rather than a C loop we have decided to have the handler
1385             just process one record, and use the C mechanism to ask that the
1386             handler be invoked again if there still remains data that might contain
1387             another record; only stopping with C when we know we can't find one.
1388              
1389             The 4-argument form of C extracts the 16-byte record from the buffer
1390             and assigns it to the C<$record> variable, if there was enough data in the
1391             buffer to extract it.
1392              
1393             A lot of protocols use a fixed-size header, followed by a variable-sized body
1394             of data, whose size is given by one of the fields of the header. The following
1395             C method extracts messages in such a protocol.
1396              
1397             sub on_read
1398             {
1399             my ( $self, $buffref, $eof ) = @_;
1400              
1401             return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes
1402              
1403             my ( $len, $x, $y ) = unpack "N n n", $$buffref;
1404              
1405             return 0 unless length $$buffref >= 8 + $len;
1406              
1407             substr( $$buffref, 0, 8, "" );
1408             my $data = substr( $$buffref, 0, $len, "" );
1409              
1410             print "A record with values x=$x y=$y\n";
1411              
1412             return 1;
1413             }
1414              
1415             In this example, the header is Ced first, to extract the body
1416             length, and then the body is extracted. If the buffer does not have enough
1417             data yet for a complete message then C<0> is returned, and the buffer is left
1418             unmodified for next time. Only when there are enough bytes in total does it
1419             use C to remove them.
1420              
1421             =head2 Dynamic replacement of C
1422              
1423             Consider the following protocol (inspired by IMAP), which consists of
1424             C<\n>-terminated lines that may have an optional data block attached. The
1425             presence of such a data block, as well as its size, is indicated by the line
1426             prefix.
1427              
1428             sub on_read
1429             {
1430             my $self = shift;
1431             my ( $buffref, $eof ) = @_;
1432              
1433             if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) {
1434             my $length = $1;
1435             my $line = $2;
1436              
1437             return sub {
1438             my $self = shift;
1439             my ( $buffref, $eof ) = @_;
1440              
1441             return 0 unless length $$buffref >= $length;
1442              
1443             # Take and remove the data from the buffer
1444             my $data = substr( $$buffref, 0, $length, "" );
1445              
1446             print "Received a line $line with some data ($data)\n";
1447              
1448             return undef; # Restore the original method
1449             }
1450             }
1451             elsif( $$buffref =~ s/^LINE:(.*)\n// ) {
1452             my $line = $1;
1453              
1454             print "Received a line $line with no data\n";
1455              
1456             return 1;
1457             }
1458             else {
1459             print STDERR "Unrecognised input\n";
1460             # Handle it somehow
1461             }
1462             }
1463              
1464             In the case where trailing data is supplied, a new temporary C
1465             callback is provided in a closure. This closure captures the C<$length>
1466             variable so it knows how much data to expect. It also captures the C<$line>
1467             variable so it can use it in the event report. When this method has finished
1468             reading the data, it reports the event, then restores the original method by
1469             returning C.
1470              
1471             =head1 SEE ALSO
1472              
1473             =over 4
1474              
1475             =item *
1476              
1477             L - Supply object methods for I/O handles
1478              
1479             =back
1480              
1481             =head1 AUTHOR
1482              
1483             Paul Evans
1484              
1485             =cut
1486              
1487             0x55AA;