File Coverage

blib/lib/Future/Utils.pm
Criterion Covered Total %
statement 160 164 97.5
branch 54 58 93.1
condition 37 55 67.2
subroutine 32 33 96.9
pod 7 9 77.7
total 290 319 90.9


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2013-2016 -- leonerd@leonerd.org.uk
5              
6             package Future::Utils;
7              
8 8     8   3966 use v5.10;
  8         24  
9 8     8   33 use strict;
  8         12  
  8         159  
10 8     8   32 use warnings;
  8         9  
  8         303  
11              
12             our $VERSION = '0.48_003';
13              
14 8     8   45 use Exporter 'import';
  8         17  
  8         790  
15             # Can't import the one from Exporter as it relies on package inheritance
16             sub export_to_level
17             {
18 0     0 0 0 my $pkg = shift; local $Exporter::ExportLevel = 1 + shift; $pkg->import(@_);
  0         0  
  0         0  
19             }
20              
21             our @EXPORT_OK = qw(
22             call
23             call_with_escape
24              
25             repeat
26             try_repeat try_repeat_until_success
27             repeat_until_success
28              
29             fmap fmap_concat
30             fmap1 fmap_scalar
31             fmap0 fmap_void
32             );
33              
34 8     8   47 use Carp;
  8         12  
  8         545  
35             our @CARP_NOT = qw( Future );
36              
37 8     8   51 use Future;
  8         22  
  8         13002  
38              
39             =head1 NAME
40              
41             C - utility functions for working with C objects
42              
43             =head1 SYNOPSIS
44              
45             use Future::Utils qw( call_with_escape );
46              
47             my $result_f = call_with_escape {
48             my $escape_f = shift;
49             my $f = ...
50             $escape_f->done( "immediate result" );
51             ...
52             };
53              
54             Z<>
55              
56             use Future::Utils qw( repeat try_repeat try_repeat_until_success );
57              
58             my $eventual_f = repeat {
59             my $trial_f = ...
60             return $trial_f;
61             } while => sub { my $f = shift; return want_more($f) };
62              
63             my $eventual_f = repeat {
64             ...
65             return $trial_f;
66             } until => sub { my $f = shift; return acceptable($f) };
67              
68             my $eventual_f = repeat {
69             my $item = shift;
70             ...
71             return $trial_f;
72             } foreach => \@items;
73              
74             my $eventual_f = try_repeat {
75             my $trial_f = ...
76             return $trial_f;
77             } while => sub { ... };
78              
79             my $eventual_f = try_repeat_until_success {
80             ...
81             return $trial_f;
82             };
83              
84             my $eventual_f = try_repeat_until_success {
85             my $item = shift;
86             ...
87             return $trial_f;
88             } foreach => \@items;
89              
90             Z<>
91              
92             use Future::Utils qw( fmap_concat fmap_scalar fmap_void );
93              
94             my $result_f = fmap_concat {
95             my $item = shift;
96             ...
97             return $item_f;
98             } foreach => \@items, concurrent => 4;
99              
100             my $result_f = fmap_scalar {
101             my $item = shift;
102             ...
103             return $item_f;
104             } foreach => \@items, concurrent => 8;
105              
106             my $done_f = fmap_void {
107             my $item = shift;
108             ...
109             return $item_f;
110             } foreach => \@items, concurrent => 10;
111              
112             Unless otherwise noted, the following functions require at least version
113             I<0.08>.
114              
115             =cut
116              
117             =head1 INVOKING A BLOCK OF CODE
118              
119             =head2 call
120              
121             $f = call { CODE }
122              
123             I
124              
125             The C function invokes a block of code that returns a future, and simply
126             returns the future it returned. The code is wrapped in an C block, so
127             that if it throws an exception this is turned into an immediate failed
128             C. If the code does not return a C, then an immediate failed
129             C instead.
130              
131             (This is equivalent to using C<< Future->call >>, but is duplicated here for
132             completeness).
133              
134             =cut
135              
136             sub call(&)
137             {
138 3     3 1 638 my ( $code ) = @_;
139 3         23 return Future->call( $code );
140             }
141              
142             =head2 call_with_escape
143              
144             $f = call_with_escape { CODE }
145              
146             I
147              
148             The C function invokes a block of code that returns a
149             future, and passes in a separate future (called here an "escape future").
150             Normally this is equivalent to the simple C function. However, if the
151             code captures this future and completes it by calling C or C on
152             it, the future returned by C immediately completes with this
153             result, and the future returned by the code itself is cancelled.
154              
155             This can be used to implement short-circuit return from an iterating loop or
156             complex sequence of code, or immediate fail that bypasses failure handling
157             logic in the code itself, or several other code patterns.
158              
159             $f = $code->( $escape_f )
160              
161             (This can be considered similar to C as found
162             in some Scheme implementations).
163              
164             =cut
165              
166             sub call_with_escape(&)
167             {
168 4     4 1 366 my ( $code ) = @_;
169              
170 4         17 my $escape_f = Future->new;
171              
172 4         22 return Future->wait_any(
173             Future->call( $code, $escape_f ),
174             $escape_f,
175             );
176             }
177              
178             =head1 REPEATING A BLOCK OF CODE
179              
180             The C function provides a way to repeatedly call a block of code that
181             returns a L (called here a "trial future") until some ending condition
182             is satisfied. The C function itself returns a C to represent
183             running the repeating loop until that end condition (called here the "eventual
184             future"). The first time the code block is called, it is passed no arguments,
185             and each subsequent invocation is passed the previous trial future.
186              
187             The result of the eventual future is the result of the last trial future.
188              
189             If the eventual future is cancelled, the latest trial future will be
190             cancelled.
191              
192             If some specific subclass or instance of C is required as the return
193             value, it can be passed as the C argument. Otherwise the return value
194             will be constructed by cloning the first non-immediate trial C.
195              
196             =head2 repeat+while
197              
198             $future = repeat { CODE } while => CODE
199              
200             Repeatedly calls the C block while the C condition returns a true
201             value. Each time the trial future completes, the C condition is passed
202             the trial future.
203              
204             $trial_f = $code->( $previous_trial_f )
205             $again = $while->( $trial_f )
206              
207             If the C<$code> block dies entirely and throws an exception, this will be
208             caught and considered as an immediately-failed C with the exception as
209             the future's failure. The exception will not be propagated to the caller.
210              
211             =head2 repeat+until
212              
213             $future = repeat { CODE } until => CODE
214              
215             Repeatedly calls the C block until the C condition returns a true
216             value. Each time the trial future completes, the C condition is passed
217             the trial future.
218              
219             $trial_f = $code->( $previous_trial_f )
220             $accept = $until->( $trial_f )
221              
222             =head2 repeat+foreach
223              
224             $future = repeat { CODE } foreach => ARRAY, otherwise => CODE
225              
226             I
227              
228             Calls the C block once for each value obtained from the array, passing
229             in the value as the first argument (before the previous trial future). When
230             there are no more items left in the array, the C code is invoked
231             once and passed the last trial future, if there was one, or C if the
232             list was originally empty. The result of the eventual future will be the
233             result of the future returned from C.
234              
235             The referenced array may be modified by this operation.
236              
237             $trial_f = $code->( $item, $previous_trial_f )
238             $final_f = $otherwise->( $last_trial_f )
239              
240             The C code is optional; if not supplied then the result of the
241             eventual future will simply be that of the last trial. If there was no trial,
242             because the C list was already empty, then an immediate successful
243             future with an empty result is returned.
244              
245             =head2 repeat+foreach+while
246              
247             $future = repeat { CODE } foreach => ARRAY, while => CODE, ...
248              
249             I
250              
251             =head2 repeat+foreach+until
252              
253             $future = repeat { CODE } foreach => ARRAY, until => CODE, ...
254              
255             I
256              
257             Combines the effects of C with C or C. Calls the
258             C block once for each value obtained from the array, until the array is
259             exhausted or the given ending condition is satisfied.
260              
261             If a C or C condition is combined with C, the
262             C code will only be run if the array was entirely exhausted. If the
263             operation is terminated early due to the C or C condition being
264             satisfied, the eventual result will simply be that of the last trial that was
265             executed.
266              
267             =head2 repeat+generate
268              
269             $future = repeat { CODE } generate => CODE, otherwise => CODE
270              
271             I
272              
273             Calls the C block once for each value obtained from the generator code,
274             passing in the value as the first argument (before the previous trial future).
275             When the generator returns an empty list, the C code is invoked and
276             passed the last trial future, if there was one, otherwise C if the
277             generator never returned a value. The result of the eventual future will be
278             the result of the future returned from C.
279              
280             $trial_f = $code->( $item, $previous_trial_f )
281             $final_f = $otherwise->( $last_trial_f )
282              
283             ( $item ) = $generate->()
284              
285             The generator is called in list context but should return only one item per
286             call. Subsequent values will be ignored. When it has no more items to return
287             it should return an empty list.
288              
289             For backward compatibility this function will allow a C or C
290             condition that requests a failure be repeated, but it will print a warning if
291             it has to do that. To apply repeating behaviour that can catch and retry
292             failures, use C instead. This old behaviour is now deprecated and
293             will be removed in the next version.
294              
295             =cut
296              
297             sub _repeat
298             {
299 36     36   85 my ( $code, $return, $trialp, $cond, $sense, $is_try ) = @_;
300              
301 36         55 my $prev = $$trialp;
302              
303 36         42 while(1) {
304 64   66     210 my $trial = $$trialp ||= Future->call( $code, $prev );
305 64         74 $prev = $trial;
306              
307 64 100       178 if( !$trial->is_ready ) {
308             # defer
309 13   66     58 $return ||= $trial->new;
310             $trial->on_ready( sub {
311 12 100   12   46 return if $$trialp->is_cancelled;
312 11         24 _repeat( $code, $return, $trialp, $cond, $sense, $is_try );
313 13         99 });
314 13         51 return $return;
315             }
316              
317 51         64 my $stop;
318 51 100       58 if( not eval { $stop = !$cond->( $trial ) ^ $sense; 1 } ) {
  51         77  
  50         111  
319 1   33     7 $return ||= $trial->new;
320 1         3 $return->fail( $@ );
321 1         2 return $return;
322             }
323              
324 50 100       76 if( $stop ) {
325             # Return result
326 22   66     60 $return ||= $trial->new;
327 22         67 $trial->on_done( $return );
328 22         58 $trial->on_fail( $return );
329 22         71 return $return;
330             }
331              
332 28 100 100     71 if( !$is_try and $trial->failure ) {
333 2         217 carp "Using Future::Utils::repeat to retry a failure is deprecated; use try_repeat instead";
334             }
335              
336             # redo
337 28         143 undef $$trialp;
338             }
339             }
340              
341             sub repeat(&@)
342             {
343 25     25 0 4734 my $code = shift;
344 25         65 my %args = @_;
345              
346             # This makes it easier to account for other conditions
347             defined($args{while}) + defined($args{until}) == 1
348             or defined($args{foreach})
349             or defined($args{generate})
350 25 50 100     103 or croak "Expected one of 'while', 'until', 'foreach' or 'generate'";
      66        
351              
352 25 100       50 if( $args{foreach} ) {
353 11 50       17 $args{generate} and croak "Cannot use both 'foreach' and 'generate'";
354              
355 11         15 my $array = delete $args{foreach};
356             $args{generate} = sub {
357 24 100   24   45 @$array ? shift @$array : ();
358 11         31 };
359             }
360              
361 25 100       46 if( $args{generate} ) {
362 14         21 my $generator = delete $args{generate};
363 14         20 my $otherwise = delete $args{otherwise};
364              
365             # TODO: This is slightly messy as this lexical is captured by both
366             # blocks of code. Can we do better somehow?
367 14         15 my $done;
368              
369 14         17 my $orig_code = $code;
370             $code = sub {
371 33     33   50 my ( $last_trial_f ) = @_;
372 33         55 my $again = my ( $value ) = $generator->( $last_trial_f );
373              
374 33 100       69 if( $again ) {
375 23         42 unshift @_, $value; goto &$orig_code;
  23         63  
376             }
377              
378 10         17 $done++;
379 10 100       17 if( $otherwise ) {
380 6         18 goto &$otherwise;
381             }
382             else {
383 4   66     13 return $last_trial_f || Future->done;
384             }
385 14         45 };
386              
387 14 100       39 if( my $orig_while = delete $args{while} ) {
    100          
388             $args{while} = sub {
389 6 100   6   14 $orig_while->( $_[0] ) and !$done;
390 3         10 };
391             }
392             elsif( my $orig_until = delete $args{until} ) {
393             $args{while} = sub {
394 2   66 2   12 !$orig_until->( $_[0] ) and !$done;
395 1         3 };
396             }
397             else {
398 10     25   28 $args{while} = sub { !$done };
  25         54  
399             }
400             }
401              
402 25         886 my $future = $args{return};
403              
404 25         31 my $trial;
405 25 100       86 $args{while} and $future = _repeat( $code, $future, \$trial, $args{while}, 0, $args{try} );
406 25 100       72 $args{until} and $future = _repeat( $code, $future, \$trial, $args{until}, 1, $args{try} );
407              
408 25     1   108 $future->on_cancel( sub { $trial->cancel } );
  1         3  
409              
410 25         136 return $future;
411             }
412              
413             =head2 try_repeat
414              
415             $future = try_repeat { CODE } ...
416              
417             I
418              
419             A variant of C that doesn't warn when the trial fails and the
420             condition code asks for it to be repeated.
421              
422             In some later version the C function will be changed so that if a
423             trial future fails, then the eventual future will immediately fail as well,
424             making its semantics a little closer to that of a C loop in Perl.
425             Code that specifically wishes to catch failures in trial futures and retry
426             the block should use C specifically.
427              
428             =cut
429              
430             sub try_repeat(&@)
431             {
432             # defeat prototype
433 6     6 1 2163 &repeat( @_, try => 1 );
434             }
435              
436             =head2 try_repeat_until_success
437              
438             $future = try_repeat_until_success { CODE } ...
439              
440             I
441              
442             A shortcut to calling C with an ending condition that simply tests
443             for a successful result from a future. May be combined with C or
444             C.
445              
446             This function used to be called C, and is currently
447             aliased as this name as well.
448              
449             =cut
450              
451             sub try_repeat_until_success(&@)
452             {
453 2     2 1 829 my $code = shift;
454 2         6 my %args = @_;
455              
456             # TODO: maybe merge while/until conditions one day...
457             defined($args{while}) or defined($args{until})
458 2 50 33     9 and croak "Cannot pass 'while' or 'until' to try_repeat_until_success";
459              
460             # defeat prototype
461 2     5   8 &try_repeat( $code, while => sub { shift->failure }, %args );
  5         11  
462             }
463              
464             # Legacy name
465             *repeat_until_success = \&try_repeat_until_success;
466              
467             =head1 APPLYING A FUNCTION TO A LIST
468              
469             The C family of functions provide a way to call a block of code that
470             returns a L (called here an "item future") once per item in a given
471             list, or returned by a generator function. The C functions themselves
472             return a C to represent the ongoing operation, which completes when
473             every item's future has completed.
474              
475             While this behaviour can also be implemented using C, the main reason
476             to use an C function is that the individual item operations are
477             considered as independent, and thus more than one can be outstanding
478             concurrently. An argument can be passed to the function to indicate how many
479             items to start initially, and thereafter it will keep that many of them
480             running concurrently until all of the items are done, or until any of them
481             fail. If an individual item future fails, the overall result future will be
482             marked as failing with the same failure, and any other pending item futures
483             that are outstanding at the time will be cancelled.
484              
485             The following named arguments are common to each C function:
486              
487             =over 8
488              
489             =item foreach => ARRAY
490              
491             Provides the list of items to iterate over, as an C reference.
492              
493             The referenced array will be modified by this operation, Cing one item
494             from it each time. The can C more items to this array as it runs, and
495             they will be included in the iteration.
496              
497             =item generate => CODE
498              
499             Provides the list of items to iterate over, by calling the generator function
500             once for each required item. The function should return a single item, or an
501             empty list to indicate it has no more items.
502              
503             ( $item ) = $generate->()
504              
505             This function will be invoked each time any previous item future has completed
506             and may be called again even after it has returned empty.
507              
508             =item concurrent => INT
509              
510             Gives the number of item futures to keep outstanding. By default this value
511             will be 1 (i.e. no concurrency); larger values indicate that multiple item
512             futures will be started at once.
513              
514             =item return => Future
515              
516             Normally, a new instance is returned by cloning the first non-immediate future
517             returned as an item future. By passing a new instance as the C
518             argument, the result will be put into the given instance. This can be used to
519             return subclasses, or specific instances.
520              
521             =back
522              
523             In each case, the main code block will be called once for each item in the
524             list, passing in the item as the only argument:
525              
526             $item_f = $code->( $item )
527              
528             The expected return value from each item's future, and the value returned from
529             the result future will differ in each function's case; they are documented
530             below.
531              
532             For similarity with perl's core C function, the item is also available
533             aliased as C<$_>.
534              
535             =cut
536              
537             # This function is invoked in two circumstances:
538             # a) to create an item Future in a slot,
539             # b) once a non-immediate item Future is complete, to check its results
540             # It can tell which circumstance by whether the slot itself is defined or not
541             sub _fmap_slot
542             {
543 60     60   108 my ( $slots, undef, $code, $generator, $collect, $results, $return ) = @_;
544              
545 60         64 SLOT: while(1) {
546             # Capture args each call because we mutate them
547 99         178 my ( undef, $idx ) = my @args = @_;
548              
549 99 100       172 unless( $slots->[$idx] ) {
550             # No item Future yet (case a), so create one
551 69         70 my $item;
552 69 100       88 unless( ( $item ) = $generator->() ) {
553             # All out of items, so now just wait for the slots to be finished
554 26         31 undef $slots->[$idx];
555 26   100     82 defined and return $return for @$slots;
556              
557             # All the slots are done
558 11   66     23 $return ||= Future->new;
559              
560 11         27 $return->done( @$results );
561 11         25 return $return;
562             }
563              
564 43         130 my $f = $slots->[$idx] = Future->call( $code, local $_ = $item );
565              
566 43 100       101 if( $collect eq "array" ) {
    100          
567 10         13 push @$results, my $r = [];
568 10     9   39 $f->on_done( sub { @$r = @_ });
  9         23  
569             }
570             elsif( $collect eq "scalar" ) {
571 3         4 push @$results, undef;
572 3         5 my $r = \$results->[-1];
573 3     3   9 $f->on_done( sub { $$r = $_[0] });
  3         17  
574             }
575             }
576              
577 73         78 my $f = $slots->[$idx];
578              
579             # Slot is non-immediate; arrange for us to be invoked again later when it's ready
580 73 100       126 if( !$f->is_ready ) {
581 36   66     104 $args[-1] = ( $return ||= $f->new );
582 36     30   137 $f->on_done( sub { _fmap_slot( @args ) } );
  30         48  
583 36         80 $f->on_fail( $return );
584              
585             # Try looking for more that might be ready
586 36         43 my $i = $idx + 1;
587 36         68 while( $i != $idx ) {
588 40         46 $i++;
589 40         56 $i %= @$slots;
590 40 100       81 next if defined $slots->[$i];
591              
592 3         5 $_[1] = $i;
593 3         5 redo SLOT;
594             }
595 33         76 return $return;
596             }
597              
598             # Either we've been invoked again (case b), or the immediate Future was
599             # already ready.
600 37 100       101 if( $f->failure ) {
601 1   33     3 $return ||= $f->new;
602 1         3 $return->fail( $f->failure );
603 1         2 return $return;
604             }
605              
606 36         66 undef $slots->[$idx];
607             # next
608             }
609             }
610              
611             sub _fmap
612             {
613 15     15   18 my $code = shift;
614 15         30 my %args = @_;
615              
616 15   100     44 my $concurrent = $args{concurrent} || 1;
617 15         17 my @slots;
618              
619 15         21 my $results = [];
620 15         18 my $future = $args{return};
621              
622 15         18 my $generator;
623 15 100       36 if( $generator = $args{generate} ) {
    50          
624             # OK
625             }
626             elsif( my $array = $args{foreach} ) {
627 14 100   65   35 $generator = sub { return unless @$array; shift @$array };
  65         124  
  40         79  
628             }
629             else {
630 0         0 croak "Expected either 'generate' or 'foreach'";
631             }
632              
633             # If any of these immediately fail, don't bother continuing
634 15         56 foreach my $idx ( 0 .. $concurrent-1 ) {
635 30         59 $future = _fmap_slot( \@slots, $idx, $code, $generator, $args{collect}, $results, $future );
636 30 100       55 last if $future->is_ready;
637             }
638              
639             $future->on_fail( sub {
640 2   66 2   7 !defined $_ or $_->is_ready or $_->cancel for @slots;
      66        
641 15         58 });
642             $future->on_cancel( sub {
643 2   66 2   10 !defined $_ or $_->is_ready or $_->cancel for @slots;
      66        
644 15         65 });
645              
646 15         60 return $future;
647             }
648              
649             =head2 fmap_concat
650              
651             $future = fmap_concat { CODE } ...
652              
653             I
654              
655             This version of C expects each item future to return a list of zero or
656             more values, and the overall result will be the concatenation of all these
657             results. It acts like a future-based equivalent to Perl's C operator.
658              
659             The results are returned in the order of the original input values, not in the
660             order their futures complete in. Because of the intermediate storage of
661             C references and final flattening operation used to implement this
662             behaviour, this function is slightly less efficient than C or
663             C in cases where item futures are expected only ever to return one,
664             or zero values, respectively.
665              
666             This function is also available under the name of simply C to emphasise
667             its similarity to perl's C keyword.
668              
669             =cut
670              
671             sub fmap_concat(&@)
672             {
673 4     4 1 1226 my $code = shift;
674 4         9 my %args = @_;
675              
676             _fmap( $code, %args, collect => "array" )->then( sub {
677 3     3   6 return Future->done( map { @$_ } @_ );
  9         33  
678 4         14 });
679             }
680             *fmap = \&fmap_concat;
681              
682             =head2 fmap_scalar
683              
684             $future = fmap_scalar { CODE } ...
685              
686             I
687              
688             This version of C acts more like the C functions found in Scheme or
689             Haskell; it expects that each item future returns only one value, and the
690             overall result will be a list containing these, in order of the original input
691             items. If an item future returns more than one value the others will be
692             discarded. If it returns no value, then C will be substituted in its
693             place so that the result list remains in correspondence with the input list.
694              
695             This function is also available under the shorter name of C.
696              
697             =cut
698              
699             sub fmap_scalar(&@)
700             {
701 1     1 1 356 my $code = shift;
702 1         3 my %args = @_;
703              
704 1         4 _fmap( $code, %args, collect => "scalar" )
705             }
706             *fmap1 = \&fmap_scalar;
707              
708             =head2 fmap_void
709              
710             $future = fmap_void { CODE } ...
711              
712             I
713              
714             This version of C does not collect any results from its item futures, it
715             simply waits for them all to complete. Its result future will provide no
716             values.
717              
718             While not a map in the strictest sense, this variant is still useful as a way
719             to control concurrency of a function call iterating over a list of items,
720             obtaining its results by some other means (such as side-effects on captured
721             variables, or some external system).
722              
723             This function is also available under the shorter name of C.
724              
725             =cut
726              
727             sub fmap_void(&@)
728             {
729 10     10 1 2639 my $code = shift;
730 10         25 my %args = @_;
731              
732 10         30 _fmap( $code, %args, collect => "void" )
733             }
734             *fmap0 = \&fmap_void;
735              
736             =head1 AUTHOR
737              
738             Paul Evans
739              
740             =cut
741              
742             0x55AA;