File Coverage

blib/lib/Future/IO.pm
Criterion Covered Total %
statement 149 157 94.9
branch 26 36 72.2
condition 37 61 60.6
subroutine 36 39 92.3
pod 12 12 100.0
total 260 305 85.2


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2019-2023 -- leonerd@leonerd.org.uk
5              
6             package Future::IO 0.13;
7              
8 13     13   1407666 use v5.14;
  13         98  
9 13     13   76 use warnings;
  13         26  
  13         407  
10              
11 13     13   108 use Carp;
  13         30  
  13         15773  
12              
13             # These need to be visible to sub override_impl
14             my @alarms;
15             my @readers;
16             my @writers;
17              
18             our $IMPL;
19              
20             our $MAX_READLEN = 8192;
21             our $MAX_WRITELEN = 8192;
22              
23             =head1 NAME
24              
25             C - Future-returning IO methods
26              
27             =head1 SYNOPSIS
28              
29             use Future::IO;
30              
31             my $delay = Future::IO->sleep( 5 );
32             # $delay will become done in 5 seconds time
33              
34             my $input = Future::IO->sysread( \*STDIN, 4096 );
35             # $input will yield some input from the STDIN IO handle
36              
37             =head1 DESCRIPTION
38              
39             This package provides a few basic methods that behave similarly to the
40             same-named core perl functions relating to IO operations, but yield their
41             results asynchronously via L instances.
42              
43             This is provided primarily as a decoupling mechanism, to allow modules to be
44             written that perform IO in an asynchronous manner to depend directly on this,
45             while allowing asynchronous event systems to provide an implementation of
46             these operations.
47              
48             =head2 Default Implementation
49              
50             If the C method is not invoked, a default implementation of
51             these operations is provided. This implementation allows a single queue of
52             C or C calls on a single filehandle only, combined with
53             C calls. It does not support C.
54              
55             It is provided for the simple cases where modules only need one filehandle
56             (most likely a single network socket or hardware device handle), allowing such
57             modules to work without needing a better event system.
58              
59             If there are both read/write and C futures pending, the implementation
60             will use C to wait for either. This may be problematic on MSWin32,
61             depending on what type of filehandle is involved.
62              
63             For cases where multiple filehandles are required, or for doing more involved
64             IO operations, a real implementation based on an actual event loop should be
65             provided. The following are known to exist; CPAN may provide others:
66              
67             =over 4
68              
69             =item *
70              
71             L
72              
73             =item *
74              
75             L
76              
77             =item *
78              
79             L
80              
81             =back
82              
83             =head2 Unit Testing
84              
85             The replaceable implementation is also useful for writing unit test scripts.
86             If the implementation is set to an instance of some sort of test fixture or
87             mocking object, a unit test can check that the appropriate IO operations
88             happen as part of the test.
89              
90             A testing module which does this is provided by L.
91              
92             =cut
93              
94             =head1 METHODS
95              
96             =cut
97              
98             =head2 accept
99              
100             $socketfh = await Future::IO->accept( $fh );
101              
102             I
103              
104             Returns a L that will become done when a new connection has been
105             accepted on the given filehandle, which should represent a listen-mode socket.
106             The returned future will yield the newly-accepted client socket filehandle.
107              
108             =cut
109              
110             sub accept
111             {
112 1     1 1 2 shift;
113 1         3 my ( $fh ) = @_;
114              
115 1   50     22 return ( $IMPL //= "Future::IO::_DefaultImpl" )->accept( $fh );
116             }
117              
118             =head2 alarm
119              
120             await Future::IO->alarm( $epoch );
121              
122             I
123              
124             Returns a L that will become done at a fixed point in the future,
125             given as an epoch timestamp (such as returned by C). This value may be
126             fractional.
127              
128             =cut
129              
130             sub alarm
131             {
132 1     1 1 3 shift;
133 1         2 my ( $epoch ) = @_;
134              
135 1   50     39 $IMPL //= "Future::IO::_DefaultImpl";
136              
137 1 50       17 if( $IMPL->can( "alarm" ) ) {
138 1         4 return $IMPL->alarm( $epoch );
139             }
140             else {
141 0         0 return $IMPL->sleep( $epoch - Time::HiRes::time() );
142             }
143             }
144              
145             =head2 connect
146              
147             await Future::IO->connect( $fh, $name );
148              
149             I
150              
151             Returns a L that will become done when a C has succeeded on
152             the given filehandle to the given sockname address.
153              
154             =cut
155              
156             sub connect
157             {
158 2     2 1 5 shift;
159 2         6 my ( $fh, $name ) = @_;
160              
161 2   100     23 return ( $IMPL //= "Future::IO::_DefaultImpl" )->connect( $fh, $name );
162             }
163              
164             =head2 sleep
165              
166             await Future::IO->sleep( $secs );
167              
168             Returns a L that will become done a fixed delay from now, given in
169             seconds. This value may be fractional.
170              
171             =cut
172              
173             sub sleep
174             {
175 9     9 1 261 shift;
176 9         23 my ( $secs ) = @_;
177              
178 9   100     55 return ( $IMPL //= "Future::IO::_DefaultImpl" )->sleep( $secs );
179             }
180              
181             =head2 sysread
182              
183             $bytes = await Future::IO->sysread( $fh, $length );
184              
185             Returns a L that will become done when at least one byte can be read
186             from the given filehandle. It may return up to C<$length> bytes. On EOF, the
187             returned future will yield an empty list (or C in scalar context). On
188             any error (other than C / C which are ignored), the
189             future fails with a suitable error message.
190              
191             Note specifically this may perform only a single C call, and thus
192             is not guaranteed to actually return the full length.
193              
194             =cut
195              
196             sub sysread
197             {
198 7     7 1 14346 shift;
199 7         23 my ( $fh, $length ) = @_;
200              
201 7   100     61 return ( $IMPL //= "Future::IO::_DefaultImpl" )->sysread( $fh, $length );
202             }
203              
204             =head2 sysread_exactly
205              
206             $bytes = await Future::IO->sysread_exactly( $fh, $length );
207              
208             I
209              
210             Returns a L that will become done when exactly the given number of
211             bytes have been read from the given filehandle. It returns exactly C<$length>
212             bytes. On EOF, the returned future will yield an empty list (or C in
213             scalar context), even if fewer bytes have already been obtained. These bytes
214             will be lost. On any error (other than C / C which are
215             ignored), the future fails with a suitable error message.
216              
217             This may make more than one C call.
218              
219             =cut
220              
221             sub sysread_exactly
222             {
223 2     2 1 8001 shift;
224 2         8 my ( $fh, $length ) = @_;
225              
226 2   100     12 $IMPL //= "Future::IO::_DefaultImpl";
227              
228 2 50       21 if( my $code = $IMPL->can( "sysread_exactly" ) ) {
229 0         0 return $IMPL->$code( $fh, $length );
230             }
231              
232 2         13 return _sysread_into_buffer( $IMPL, $fh, $length, \(my $buffer = '') );
233             }
234              
235             sub _sysread_into_buffer
236             {
237 7     7   16 my ( $IMPL, $fh, $length, $bufref ) = @_;
238              
239             $IMPL->sysread( $fh, $length - length $$bufref )->then( sub {
240 7     7   916 my ( $more ) = @_;
241 7 100       20 return Future->done() if !defined $more; # EOF
242              
243 6         13 $$bufref .= $more;
244              
245 6 100       19 return Future->done( $$bufref ) if length $$bufref >= $length;
246 5         11 return _sysread_into_buffer( $IMPL, $fh, $length, $bufref );
247 7         28 });
248             }
249              
250             =head2 sysread_until_eof
251              
252             $f = Future::IO->sysread_until_eof( $fh )
253              
254             I
255              
256             Returns a L that will become done when the given filehandle reaches
257             the EOF condition. The returned future will yield all of the bytes read up
258             until that point.
259              
260             =cut
261              
262             sub sysread_until_eof
263             {
264 1     1 1 226 shift;
265 1         4 my ( $fh ) = @_;
266              
267 1   50     8 $IMPL //= "Future::IO::_DefaultImpl";
268              
269 1         4 return _sysread_until_eof( $IMPL, $fh, \(my $buffer = '') );
270             }
271              
272             sub _sysread_until_eof
273             {
274 2     2   5 my ( $IMPL, $fh, $bufref ) = @_;
275              
276             $IMPL->sysread( $fh, $MAX_READLEN )->then( sub {
277 2     2   274 my ( $more ) = @_;
278 2 100       10 return Future->done( $$bufref ) if !defined $more;
279              
280 1         2 $$bufref .= $more;
281 1         3 return _sysread_until_eof( $IMPL, $fh, $bufref );
282 2         15 });
283             }
284              
285             =head2 syswrite
286              
287             $written_len = await Future::IO->syswrite( $fh, $bytes );
288              
289             I
290              
291             Returns a L that will become done when at least one byte has been
292             written to the given filehandle. It may write up to all of the bytes. On any
293             error (other than C / C which are ignored) the future
294             fails with a suitable error message.
295              
296             Note specifically this may perform only a single C call, and thus
297             is not guaranteed to actually return the full length.
298              
299             =cut
300              
301             sub syswrite
302             {
303 5     5 1 10 shift;
304 5         14 my ( $fh, $bytes ) = @_;
305              
306 5   100     39 return ( $IMPL //= "Future::IO::_DefaultImpl" )->syswrite( $fh, $bytes );
307             }
308              
309             =head2 syswrite_exactly
310              
311             $written_len = await Future::IO->syswrite_exactly( $fh, $bytes );
312              
313             I
314              
315             Returns a L that will become done when exactly the given bytes have
316             been written to the given filehandle. On any error (other than C /
317             C which are ignored) the future fails with a suitable error
318             message.
319              
320             This may make more than one C call.
321              
322             =cut
323              
324             sub syswrite_exactly
325             {
326 1     1 1 175 shift;
327 1         4 my ( $fh, $bytes ) = @_;
328              
329 1   50     9 $IMPL //= "Future::IO::_DefaultImpl";
330              
331 1 50       18 if( my $code = $IMPL->can( "syswrite_exactly" ) ) {
332 0         0 return $IMPL->$code( $fh, $bytes );
333             }
334              
335 1         52 return _syswrite_from_buffer( $IMPL, $fh, \$bytes, length $bytes );
336             }
337              
338             sub _syswrite_from_buffer
339             {
340 3     3   11 my ( $IMPL, $fh, $bufref, $len ) = @_;
341              
342             $IMPL->syswrite( $fh, substr $$bufref, 0, $MAX_WRITELEN )->then( sub {
343 3     3   426 my ( $written_len ) = @_;
344 3         11 substr $$bufref, 0, $written_len, "";
345              
346 3 100       22 return Future->done( $len ) if !length $$bufref;
347 2         7 return _syswrite_from_buffer( $IMPL, $fh, $bufref, $len );
348 3         18 });
349             }
350              
351             =head2 waitpid
352              
353             $wstatus = await Future::IO->waitpid( $pid );
354              
355             I
356              
357             Returns a L that will become done when the given child process
358             terminates. The future will yield the wait status of the child process.
359             This can be inspected by the usual bitshifting operations as per C<$?>:
360              
361             if( my $termsig = ($wstatus & 0x7f) ) {
362             say "Terminated with signal $termsig";
363             }
364             else {
365             my $exitcode = ($wstatus >> 8);
366             say "Terminated with exit code $exitcode";
367             }
368              
369             =cut
370              
371             sub waitpid
372             {
373 0     0 1 0 shift;
374 0         0 my ( $pid ) = @_;
375              
376 0   0     0 return ( $IMPL //= "Future::IO::_DefaultImpl" )->waitpid( $pid );
377             }
378              
379             =head2 override_impl
380              
381             Future::IO->override_impl( $impl );
382              
383             Sets a new implementation for C, replacing the minimal default
384             internal implementation. This can either be a package name or an object
385             instance reference, but must provide the methods named above.
386              
387             This method is intended to be called by event loops and other similar places,
388             to provide a better integration. Another way, which doesn't involve directly
389             depending on C or loading it, is to use the C<$IMPL> variable; see
390             below.
391              
392             Can only be called once, and only if the default implementation is not in use,
393             therefore a module that wishes to override this ought to invoke it as soon as
394             possible on program startup, before any of the main C methods may
395             have been called.
396              
397             =cut
398              
399             my $overridden;
400              
401             sub override_impl
402             {
403 1     1 1 88 shift;
404 1 50       4 croak "Future::IO implementation is already overridden" if defined $IMPL;
405 1 50 33     8 croak "Future::IO implementation cannot be set once default is already in use"
406             if @alarms or @readers;
407              
408 1         5 ( $IMPL ) = @_;
409             }
410              
411             =head2 HAVE_MULTIPLE_FILEHANDLES
412              
413             $has = Future::IO->HAVE_MULTIPLE_FILEHANDLES;
414              
415             I
416              
417             Returns true if the underlying IO implementation actually supports multiple
418             filehandles. Most real support modules will return true here, but this returns
419             false for the internal minimal implementation.
420              
421             =cut
422              
423             sub HAVE_MULTIPLE_FILEHANDLES
424             {
425 0   0 0 1 0 return ( $IMPL //= "Future::IO::_DefaultImpl" )->HAVE_MULTIPLE_FILEHANDLES;
426             }
427              
428             package
429             Future::IO::_DefaultImpl;
430 13     13   110 use base qw( Future::IO::ImplBase );
  13         25  
  13         6096  
431 13     13   95 use Carp;
  13         44  
  13         739  
432              
433 13     13   6418 use Struct::Dumb qw( readonly_struct );
  13         34854  
  13         60  
434 13     13   860 use Time::HiRes qw( time );
  13         28  
  13         111  
435              
436             readonly_struct Alarm => [qw( time f )];
437              
438             readonly_struct Reader => [qw( fh f )];
439             readonly_struct Writer => [qw( fh f )];
440              
441 13     13   1918 use constant HAVE_MULTIPLE_FILEHANDLES => 0;
  13         41  
  13         7919  
442              
443             sub alarm
444             {
445 1     1   20 my $class = shift;
446 1         7 return $class->_done_at( shift );
447             }
448              
449             sub sleep
450             {
451 7     7   14 my $class = shift;
452 7         34 return $class->_done_at( time() + shift );
453             }
454              
455             sub ready_for_read
456             {
457 16     16   27 my $class = shift;
458 16         26 my ( $fh ) = @_;
459              
460 16 50 66     54 croak "This implementation can only cope with a single pending filehandle in ->syread"
461             if @readers and $readers[-1]->fh != $fh;
462              
463 16         106 my $f = Future::IO::_DefaultImpl::F->new;
464 16         153 push @readers, Reader( $fh, $f );
465              
466             $f->on_cancel( sub {
467 2     2   299 my $f = shift;
468              
469 2         6 my $idx = 0;
470 2   33     19 $idx++ while $idx < @readers and $readers[$idx]->f != $f;
471              
472 2         24 splice @readers, $idx, 1, ();
473 16         228 });
474              
475 16         470 return $f;
476             }
477              
478             sub ready_for_write
479             {
480 10     10   18 my $class = shift;
481 10         23 my ( $fh ) = @_;
482              
483 10 50 66     36 croak "This implementation can only cope with a single pending filehandle in ->syswrite"
484             if @writers and $writers[-1]->fh != $fh;
485              
486 10         87 my $f = Future::IO::_DefaultImpl::F->new;
487 10         116 push @writers, Writer( $fh, $f );
488              
489             $f->on_cancel( sub {
490 1     1   46 my $f = shift;
491              
492 1         2 my $idx = 0;
493 1   33     31 $idx++ while $idx < @writers and $writers[$idx]->f != $f;
494              
495 1         56 splice @writers, $idx, 1, ();
496 10         138 });
497              
498 10         329 return $f;
499             }
500              
501             sub waitpid
502             {
503 0     0   0 croak "This implementation cannot handle waitpid";
504             }
505              
506             sub _done_at
507             {
508 8     8   13 shift;
509 8         20 my ( $time ) = @_;
510              
511 8         38 my $f = Future::IO::_DefaultImpl::F->new;
512              
513             # TODO: Binary search
514 8         62 my $idx = 0;
515 8   66     38 $idx++ while $idx < @alarms and $alarms[$idx]->time < $time;
516              
517 8         47 splice @alarms, $idx, 0, Alarm( $time, $f );
518              
519             $f->on_cancel( sub {
520 2     2   200 my $self = shift;
521              
522 2         4 my $idx = 0;
523 2   33     36 $idx++ while $idx < @alarms and $alarms[$idx]->f != $f;
524              
525 2         28 splice @alarms, $idx, 1, ();
526 8         121 } );
527              
528 8         229 return $f;
529             }
530              
531             package # hide
532             Future::IO::_DefaultImpl::F;
533 13     13   125 use base qw( Future );
  13         31  
  13         8912  
534 13     13   157350 use Time::HiRes qw( time );
  13         38  
  13         91  
535              
536             sub _await_once
537             {
538 29 50 100 29   397 die "Cowardly refusing to sit idle and do nothing" unless @alarms || @readers || @writers;
      66        
539              
540             # If we always select() then problematic platforms like MSWin32 would
541             # always break. Instead, we'll only select() if we're waiting on more than
542             # one of alarm, reader, writer. If not we'll just presume the one operation
543             # we're waiting for is definitely ready right now.
544 29   66     139 my $do_select = @alarms || ( @readers && @writers );
545              
546 29         82 my $rready;
547             my $wready;
548              
549             redo_select:
550 29 100       94 if( $do_select ) {
551 7         19 my $rvec = '';
552 7 100       27 vec( $rvec, $readers[0]->fh->fileno, 1 ) = 1 if @readers;
553              
554 7         47 my $wvec = '';
555 7 50       20 vec( $wvec, $writers[0]->fh->fileno, 1 ) = 1 if @writers;
556              
557 7         10 my $maxwait;
558 7 50       32 $maxwait = $alarms[0]->time - time() if @alarms;
559              
560 7         1200580 my $ret = select( $rvec, $wvec, undef, $maxwait );
561              
562 7   66     137 $rready = $ret && @readers && vec( $rvec, $readers[0]->fh->fileno, 1 );
563 7   33     64 $wready = $ret && @writers && vec( $wvec, $writers[0]->fh->fileno, 1 );
564             }
565             else {
566 22         45 $rready = !!@readers;
567 22         41 $wready = !!@writers;
568             }
569              
570 29 100       80 if( $rready ) {
571 14         51 ( shift @readers )->f->done;
572             }
573 29 100       2789 if( $wready ) {
574 9         32 ( shift @writers )->f->done;
575             }
576              
577 29         1725 my $now = time();
578 29   100     269 while( @alarms and $alarms[0]->time <= $now ) {
579 6         192 ( shift @alarms )->f->done;
580             }
581             }
582              
583             =head2 await
584              
585             $f = $f->await;
586              
587             I
588              
589             Blocks until this future is ready (either by success or failure). Does not
590             throw an exception if failed.
591              
592             =cut
593              
594             sub await
595             {
596 20     20   1405 my $self = shift;
597 20         71 _await_once until $self->is_ready;
598 20         757 return $self;
599             }
600              
601             =head1 THE C<$IMPL> VARIABLE
602              
603             I
604              
605             As an alternative to setting an implementation by using L, a
606             package variable is also available that allows modules such as event systems
607             to opportunistically provide an implementation without needing to depend on
608             the module, or loading it C. Simply directly set that package
609             variable to the name of an implementing package or an object instance.
610              
611             Additionally, implementors may use a name within the C
612             namespace, suffixed by the name of their event system.
613              
614             For example, something like the following code arrangement is recommended.
615              
616             package Future::IO::Impl::BananaLoop;
617              
618             {
619             no warnings 'once';
620             ( $Future::IO::IMPL //= __PACKAGE__ ) eq __PACKAGE__ or
621             warn "Unable to set Future::IO implementation to " . __PACKAGE__ .
622             " as it is already $Future::IO::IMPL\n";
623             }
624              
625             sub sleep
626             {
627             ...
628             }
629              
630             sub sysread
631             {
632             ...
633             }
634              
635             sub syswrite
636             {
637             ...
638             }
639              
640             sub waitpid
641             {
642             ...
643             }
644              
645             Optionally, you can also implement L and
646             L:
647              
648             sub sysread_exactly
649             {
650             ...
651             }
652              
653             sub syswrite_exactly
654             {
655             ...
656             }
657              
658             If not, they will be emulated by C itself, making multiple calls
659             to the non-C<_exactly> versions.
660              
661             =head1 AUTHOR
662              
663             Paul Evans
664              
665             =cut
666              
667             0x55AA;