File Coverage

blib/lib/Future/IO.pm
Criterion Covered Total %
statement 149 157 94.9
branch 26 36 72.2
condition 37 61 60.6
subroutine 36 39 92.3
pod 12 12 100.0
total 260 305 85.2


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2019-2023 -- leonerd@leonerd.org.uk
5              
6             package Future::IO 0.12;
7              
8 13     13   436634 use v5.14;
  13         103  
9 13     13   69 use warnings;
  13         24  
  13         341  
10              
11 13     13   64 use Carp;
  13         30  
  13         15444  
12              
13             # These need to be visible to sub override_impl
14             my @alarms;
15             my @readers;
16             my @writers;
17              
18             our $IMPL;
19              
20             our $MAX_READLEN = 8192;
21             our $MAX_WRITELEN = 8192;
22              
23             =head1 NAME
24              
25             C - Future-returning IO methods
26              
27             =head1 SYNOPSIS
28              
29             use Future::IO;
30              
31             my $delay = Future::IO->sleep( 5 );
32             # $delay will become done in 5 seconds time
33              
34             my $input = Future::IO->sysread( \*STDIN, 4096 );
35             # $input will yield some input from the STDIN IO handle
36              
37             =head1 DESCRIPTION
38              
39             This package provides a few basic methods that behave similarly to the
40             same-named core perl functions relating to IO operations, but yield their
41             results asynchronously via L instances.
42              
43             This is provided primarily as a decoupling mechanism, to allow modules to be
44             written that perform IO in an asynchronous manner to depend directly on this,
45             while allowing asynchronous event systems to provide an implementation of
46             these operations.
47              
48             =head2 Default Implementation
49              
50             If the C method is not invoked, a default implementation of
51             these operations is provided. This implementation allows a single queue of
52             C or C calls on a single filehandle only, combined with
53             C calls. It does not support C.
54              
55             It is provided for the simple cases where modules only need one filehandle
56             (most likely a single network socket or hardware device handle), allowing such
57             modules to work without needing a better event system.
58              
59             If there are both read/write and C futures pending, the implementation
60             will use C to wait for either. This may be problematic on MSWin32,
61             depending on what type of filehandle is involved.
62              
63             For cases where multiple filehandles are required, or for doing more involved
64             IO operations, a real implementation based on an actual event loop should be
65             provided. The following are known to exist; CPAN may provide others:
66              
67             =over 4
68              
69             =item *
70              
71             L
72              
73             =item *
74              
75             L
76              
77             =item *
78              
79             L
80              
81             =back
82              
83             =head2 Unit Testing
84              
85             The replaceable implementation is also useful for writing unit test scripts.
86             If the implementation is set to an instance of some sort of test fixture or
87             mocking object, a unit test can check that the appropriate IO operations
88             happen as part of the test.
89              
90             A testing module which does this is provided by L.
91              
92             =cut
93              
94             =head1 METHODS
95              
96             =cut
97              
98             =head2 accept
99              
100             $socketfh = await Future::IO->accept( $fh );
101              
102             I
103              
104             Returns a L that will become done when a new connection has been
105             accepted on the given filehandle, which should represent a listen-mode socket.
106             The returned future will yield the newly-accepted client socket filehandle.
107              
108             =cut
109              
110             sub accept
111             {
112 1     1 1 3 shift;
113 1         3 my ( $fh ) = @_;
114              
115 1   50     15 return ( $IMPL //= "Future::IO::_DefaultImpl" )->accept( $fh );
116             }
117              
118             =head2 alarm
119              
120             await Future::IO->alarm( $epoch );
121              
122             I
123              
124             Returns a L that will become done at a fixed point in the future,
125             given as an epoch timestamp (such as returned by C). This value may be
126             fractional.
127              
128             =cut
129              
130             sub alarm
131             {
132 1     1 1 5 shift;
133 1         58 my ( $epoch ) = @_;
134              
135 1   50     9 $IMPL //= "Future::IO::_DefaultImpl";
136              
137 1 50       57 if( $IMPL->can( "alarm" ) ) {
138 1         11 return $IMPL->alarm( $epoch );
139             }
140             else {
141 0         0 return $IMPL->sleep( $epoch - Time::HiRes::time() );
142             }
143             }
144              
145             =head2 connect
146              
147             await Future::IO->connect( $fh, $name );
148              
149             I
150              
151             Returns a L that will become done when a C has succeeded on
152             the given filehandle to the given sockname address.
153              
154             =cut
155              
156             sub connect
157             {
158 2     2 1 4 shift;
159 2         8 my ( $fh, $name ) = @_;
160              
161 2   100     24 return ( $IMPL //= "Future::IO::_DefaultImpl" )->connect( $fh, $name );
162             }
163              
164             =head2 sleep
165              
166             await Future::IO->sleep( $secs );
167              
168             Returns a L that will become done a fixed delay from now, given in
169             seconds. This value may be fractional.
170              
171             =cut
172              
173             sub sleep
174             {
175 9     9 1 278 shift;
176 9         30 my ( $secs ) = @_;
177              
178 9   100     64 return ( $IMPL //= "Future::IO::_DefaultImpl" )->sleep( $secs );
179             }
180              
181             =head2 sysread
182              
183             $bytes = await Future::IO->sysread( $fh, $length );
184              
185             Returns a L that will become done when at least one byte can be read
186             from the given filehandle. It may return up to C<$length> bytes. On EOF, the
187             returned future will yield an empty list (or C in scalar context). On
188             any error (other than C / C which are ignored), the
189             future fails with a suitable error message.
190              
191             Note specifically this may perform only a single C call, and thus
192             is not guaranteed to actually return the full length.
193              
194             =cut
195              
196             sub sysread
197             {
198 7     7 1 2355 shift;
199 7         17 my ( $fh, $length ) = @_;
200              
201 7   100     59 return ( $IMPL //= "Future::IO::_DefaultImpl" )->sysread( $fh, $length );
202             }
203              
204             =head2 sysread_exactly
205              
206             $bytes = await Future::IO->sysread_exactly( $fh, $length );
207              
208             I
209              
210             Returns a L that will become done when exactly the given number of
211             bytes have been read from the given filehandle. It returns exactly C<$length>
212             bytes. On EOF, the returned future will yield an empty list (or C in
213             scalar context), even if fewer bytes have already been obtained. These bytes
214             will be lost. On any error (other than C / C which are
215             ignored), the future fails with a suitable error message.
216              
217             This may make more than one C call.
218              
219             =cut
220              
221             sub sysread_exactly
222             {
223 2     2 1 1738 shift;
224 2         5 my ( $fh, $length ) = @_;
225              
226 2   100     11 $IMPL //= "Future::IO::_DefaultImpl";
227              
228 2 50       20 if( my $code = $IMPL->can( "sysread_exactly" ) ) {
229 0         0 return $IMPL->$code( $fh, $length );
230             }
231              
232 2         6 return _sysread_into_buffer( $IMPL, $fh, $length, \(my $buffer = '') );
233             }
234              
235             sub _sysread_into_buffer
236             {
237 7     7   16 my ( $IMPL, $fh, $length, $bufref ) = @_;
238              
239             $IMPL->sysread( $fh, $length - length $$bufref )->then( sub {
240 7     7   859 my ( $more ) = @_;
241 7 100       20 return Future->done() if !defined $more; # EOF
242              
243 6         12 $$bufref .= $more;
244              
245 6 100       17 return Future->done( $$bufref ) if length $$bufref >= $length;
246 5         10 return _sysread_into_buffer( $IMPL, $fh, $length, $bufref );
247 7         28 });
248             }
249              
250             =head2 sysread_until_eof
251              
252             $f = Future::IO->sysread_until_eof( $fh )
253              
254             I
255              
256             Returns a L that will become done when the given filehandle reaches
257             the EOF condition. The returned future will yield all of the bytes read up
258             until that point.
259              
260             =cut
261              
262             sub sysread_until_eof
263             {
264 1     1 1 245 shift;
265 1         5 my ( $fh ) = @_;
266              
267 1   50     9 $IMPL //= "Future::IO::_DefaultImpl";
268              
269 1         3 return _sysread_until_eof( $IMPL, $fh, \(my $buffer = '') );
270             }
271              
272             sub _sysread_until_eof
273             {
274 2     2   6 my ( $IMPL, $fh, $bufref ) = @_;
275              
276             $IMPL->sysread( $fh, $MAX_READLEN )->then( sub {
277 2     2   285 my ( $more ) = @_;
278 2 100       10 return Future->done( $$bufref ) if !defined $more;
279              
280 1         3 $$bufref .= $more;
281 1         3 return _sysread_until_eof( $IMPL, $fh, $bufref );
282 2         20 });
283             }
284              
285             =head2 syswrite
286              
287             $written_len = await Future::IO->syswrite( $fh, $bytes );
288              
289             I
290              
291             Returns a L that will become done when at least one byte has been
292             written to the given filehandle. It may write up to all of the bytes. On any
293             error (other than C / C which are ignored) the future
294             fails with a suitable error message.
295              
296             Note specifically this may perform only a single C call, and thus
297             is not guaranteed to actually return the full length.
298              
299             =cut
300              
301             sub syswrite
302             {
303 5     5 1 10 shift;
304 5         13 my ( $fh, $bytes ) = @_;
305              
306 5   100     34 return ( $IMPL //= "Future::IO::_DefaultImpl" )->syswrite( $fh, $bytes );
307             }
308              
309             =head2 syswrite_exactly
310              
311             $written_len = await Future::IO->syswrite_exactly( $fh, $bytes );
312              
313             I
314              
315             Returns a L that will become done when exactly the given bytes have
316             been written to the given filehandle. On any error (other than C /
317             C which are ignored) the future fails with a suitable error
318             message.
319              
320             This may make more than one C call.
321              
322             =cut
323              
324             sub syswrite_exactly
325             {
326 1     1 1 142 shift;
327 1         4 my ( $fh, $bytes ) = @_;
328              
329 1   50     9 $IMPL //= "Future::IO::_DefaultImpl";
330              
331 1 50       16 if( my $code = $IMPL->can( "syswrite_exactly" ) ) {
332 0         0 return $IMPL->$code( $fh, $bytes );
333             }
334              
335 1         6 return _syswrite_from_buffer( $IMPL, $fh, \$bytes, length $bytes );
336             }
337              
338             sub _syswrite_from_buffer
339             {
340 3     3   7 my ( $IMPL, $fh, $bufref, $len ) = @_;
341              
342             $IMPL->syswrite( $fh, substr $$bufref, 0, $MAX_WRITELEN )->then( sub {
343 3     3   454 my ( $written_len ) = @_;
344 3         8 substr $$bufref, 0, $written_len, "";
345              
346 3 100       13 return Future->done( $len ) if !length $$bufref;
347 2         5 return _syswrite_from_buffer( $IMPL, $fh, $bufref, $len );
348 3         17 });
349             }
350              
351             =head2 waitpid
352              
353             $wstatus = await Future::IO->waitpid( $pid );
354              
355             I
356              
357             Returns a L that will become done when the given child process
358             terminates. The future will yield the wait status of the child process.
359             This can be inspected by the usual bitshifting operations as per C<$?>:
360              
361             if( my $termsig = ($wstatus & 0x7f) ) {
362             say "Terminated with signal $termsig";
363             }
364             else {
365             my $exitcode = ($wstatus >> 8);
366             say "Terminated with exit code $exitcode";
367             }
368              
369             =cut
370              
371             sub waitpid
372             {
373 0     0 1 0 shift;
374 0         0 my ( $pid ) = @_;
375              
376 0   0     0 return ( $IMPL //= "Future::IO::_DefaultImpl" )->waitpid( $pid );
377             }
378              
379             =head2 override_impl
380              
381             Future::IO->override_impl( $impl );
382              
383             Sets a new implementation for C, replacing the minimal default
384             internal implementation. This can either be a package name or an object
385             instance reference, but must provide the methods named above.
386              
387             This method is intended to be called by event loops and other similar places,
388             to provide a better integration. Another way, which doesn't involve directly
389             depending on C or loading it, is to use the C<$IMPL> variable; see
390             below.
391              
392             Can only be called once, and only if the default implementation is not in use,
393             therefore a module that wishes to override this ought to invoke it as soon as
394             possible on program startup, before any of the main C methods may
395             have been called.
396              
397             =cut
398              
399             my $overridden;
400              
401             sub override_impl
402             {
403 1     1 1 94 shift;
404 1 50       4 croak "Future::IO implementation is already overridden" if defined $IMPL;
405 1 50 33     11 croak "Future::IO implementation cannot be set once default is already in use"
406             if @alarms or @readers;
407              
408 1         4 ( $IMPL ) = @_;
409             }
410              
411             =head2 HAVE_MULTIPLE_FILEHANDLES
412              
413             $has = Future::IO->HAVE_MULTIPLE_FILEHANDLES;
414              
415             I
416              
417             Returns true if the underlying IO implementation actually supports multiple
418             filehandles. Most real support modules will return true here, but this returns
419             false for the internal minimal implementation.
420              
421             =cut
422              
423             sub HAVE_MULTIPLE_FILEHANDLES
424             {
425 0   0 0 1 0 return ( $IMPL //= "Future::IO::_DefaultImpl" )->HAVE_MULTIPLE_FILEHANDLES;
426             }
427              
428             package
429             Future::IO::_DefaultImpl;
430 13     13   108 use base qw( Future::IO::ImplBase );
  13         21  
  13         5832  
431 13     13   89 use Carp;
  13         30  
  13         747  
432              
433 13     13   6314 use Struct::Dumb qw( readonly_struct );
  13         34307  
  13         54  
434 13     13   4958 use Time::HiRes qw( time );
  13         11261  
  13         58  
435              
436             readonly_struct Alarm => [qw( time f )];
437              
438             readonly_struct Reader => [qw( fh f )];
439             readonly_struct Writer => [qw( fh f )];
440              
441 13     13   2587 use constant HAVE_MULTIPLE_FILEHANDLES => 0;
  13         26  
  13         8483  
442              
443             sub alarm
444             {
445 1     1   5 my $class = shift;
446 1         7 return $class->_done_at( shift );
447             }
448              
449             sub sleep
450             {
451 7     7   22 my $class = shift;
452 7         42 return $class->_done_at( time() + shift );
453             }
454              
455             sub ready_for_read
456             {
457 16     16   27 my $class = shift;
458 16         32 my ( $fh ) = @_;
459              
460 16 50 66     53 croak "This implementation can only cope with a single pending filehandle in ->syread"
461             if @readers and $readers[-1]->fh != $fh;
462              
463 16         100 my $f = Future::IO::_DefaultImpl::F->new;
464 16         165 push @readers, Reader( $fh, $f );
465              
466             $f->on_cancel( sub {
467 2     2   458 my $f = shift;
468              
469 2         7 my $idx = 0;
470 2   33     22 $idx++ while $idx < @readers and $readers[$idx]->f != $f;
471              
472 2         36 splice @readers, $idx, 1, ();
473 16         216 });
474              
475 16         509 return $f;
476             }
477              
478             sub ready_for_write
479             {
480 10     10   19 my $class = shift;
481 10         19 my ( $fh ) = @_;
482              
483 10 50 66     49 croak "This implementation can only cope with a single pending filehandle in ->syswrite"
484             if @writers and $writers[-1]->fh != $fh;
485              
486 10         97 my $f = Future::IO::_DefaultImpl::F->new;
487 10         118 push @writers, Writer( $fh, $f );
488              
489             $f->on_cancel( sub {
490 1     1   80 my $f = shift;
491              
492 1         4 my $idx = 0;
493 1   33     21 $idx++ while $idx < @writers and $writers[$idx]->f != $f;
494              
495 1         12 splice @writers, $idx, 1, ();
496 10         142 });
497              
498 10         323 return $f;
499             }
500              
501             sub waitpid
502             {
503 0     0   0 croak "This implementation cannot handle waitpid";
504             }
505              
506             sub _done_at
507             {
508 8     8   17 shift;
509 8         25 my ( $time ) = @_;
510              
511 8         42 my $f = Future::IO::_DefaultImpl::F->new;
512              
513             # TODO: Binary search
514 8         87 my $idx = 0;
515 8   66     50 $idx++ while $idx < @alarms and $alarms[$idx]->time < $time;
516              
517 8         68 splice @alarms, $idx, 0, Alarm( $time, $f );
518              
519             $f->on_cancel( sub {
520 2     2   252 my $self = shift;
521              
522 2         6 my $idx = 0;
523 2   33     20 $idx++ while $idx < @alarms and $alarms[$idx]->f != $f;
524              
525 2         34 splice @alarms, $idx, 1, ();
526 8         151 } );
527              
528 8         292 return $f;
529             }
530              
531             package # hide
532             Future::IO::_DefaultImpl::F;
533 13     13   118 use base qw( Future );
  13         24  
  13         8851  
534 13     13   179027 use Time::HiRes qw( time );
  13         29  
  13         78  
535              
536             sub _await_once
537             {
538 29 50 100 29   445 die "Cowardly refusing to sit idle and do nothing" unless @alarms || @readers || @writers;
      66        
539              
540             # If we always select() then problematic platforms like MSWin32 would
541             # always break. Instead, we'll only select() if we're waiting on more than
542             # one of alarm, reader, writer. If not we'll just presume the one operation
543             # we're waiting for is definitely ready right now.
544 29   66     134 my $do_select = @alarms || ( @readers && @writers );
545              
546 29         134 my $rready;
547             my $wready;
548              
549             redo_select:
550 29 100       80 if( $do_select ) {
551 7         17 my $rvec = '';
552 7 100       27 vec( $rvec, $readers[0]->fh->fileno, 1 ) = 1 if @readers;
553              
554 7         54 my $wvec = '';
555 7 50       22 vec( $wvec, $writers[0]->fh->fileno, 1 ) = 1 if @writers;
556              
557 7         16 my $maxwait;
558 7 50       36 $maxwait = $alarms[0]->time - time() if @alarms;
559              
560 7         1200639 my $ret = select( $rvec, $wvec, undef, $maxwait );
561              
562 7   66     169 $rready = $ret && @readers && vec( $rvec, $readers[0]->fh->fileno, 1 );
563 7   33     84 $wready = $ret && @writers && vec( $wvec, $writers[0]->fh->fileno, 1 );
564             }
565             else {
566 22         45 $rready = !!@readers;
567 22         37 $wready = !!@writers;
568             }
569              
570 29 100       102 if( $rready ) {
571 14         46 ( shift @readers )->f->done;
572             }
573 29 100       2590 if( $wready ) {
574 9         30 ( shift @writers )->f->done;
575             }
576              
577 29         1641 my $now = time();
578 29   100     325 while( @alarms and $alarms[0]->time <= $now ) {
579 6         259 ( shift @alarms )->f->done;
580             }
581             }
582              
583             =head2 await
584              
585             $f = $f->await;
586              
587             I
588              
589             Blocks until this future is ready (either by success or failure). Does not
590             throw an exception if failed.
591              
592             =cut
593              
594             sub await
595             {
596 20     20   1485 my $self = shift;
597 20         74 _await_once until $self->is_ready;
598 20         1042 return $self;
599             }
600              
601             =head1 THE C<$IMPL> VARIABLE
602              
603             I
604              
605             As an alternative to setting an implementation by using L, a
606             package variable is also available that allows modules such as event systems
607             to opportunistically provide an implementation without needing to depend on
608             the module, or loading it C. Simply directly set that package
609             variable to the name of an implementing package or an object instance.
610              
611             Additionally, implementors may use a name within the C
612             namespace, suffixed by the name of their event system.
613              
614             For example, something like the following code arrangement is recommended.
615              
616             package Future::IO::Impl::BananaLoop;
617              
618             {
619             no warnings 'once';
620             ( $Future::IO::IMPL //= __PACKAGE__ ) eq __PACKAGE__ or
621             warn "Unable to set Future::IO implementation to " . __PACKAGE__ .
622             " as it is already $Future::IO::IMPL\n";
623             }
624              
625             sub sleep
626             {
627             ...
628             }
629              
630             sub sysread
631             {
632             ...
633             }
634              
635             sub syswrite
636             {
637             ...
638             }
639              
640             sub waitpid
641             {
642             ...
643             }
644              
645             Optionally, you can also implement L and
646             L:
647              
648             sub sysread_exactly
649             {
650             ...
651             }
652              
653             sub syswrite_exactly
654             {
655             ...
656             }
657              
658             If not, they will be emulated by C itself, making multiple calls
659             to the non-C<_exactly> versions.
660              
661             =head1 AUTHOR
662              
663             Paul Evans
664              
665             =cut
666              
667             0x55AA;