File Coverage

blib/lib/IO/Async/Function.pm
Criterion Covered Total %
statement 240 251 95.6
branch 83 112 74.1
condition 20 32 62.5
subroutine 47 48 97.9
pod 8 11 72.7
total 398 454 87.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Function;
7              
8 10     10   341634 use strict;
  10         23  
  10         342  
9 10     10   53 use warnings;
  10         22  
  10         463  
10              
11             our $VERSION = '0.79';
12              
13 10     10   59 use base qw( IO::Async::Notifier );
  10         16  
  10         2547  
14 10     10   4917 use IO::Async::Timer::Countdown;
  10         26  
  10         274  
15              
16 10     10   64 use Carp;
  10         17  
  10         576  
17              
18 10     10   57 use List::Util qw( first );
  10         29  
  10         744  
19              
20 10     10   3658 use Struct::Dumb qw( readonly_struct );
  10         14727  
  10         58  
21              
22             readonly_struct Pending => [qw( priority f )];
23              
24             =head1 NAME
25              
26             C - call a function asynchronously
27              
28             =head1 SYNOPSIS
29              
30             use IO::Async::Function;
31              
32             use IO::Async::Loop;
33             my $loop = IO::Async::Loop->new;
34              
35             my $function = IO::Async::Function->new(
36             code => sub {
37             my ( $number ) = @_;
38             return is_prime( $number );
39             },
40             );
41              
42             $loop->add( $function );
43              
44             $function->call(
45             args => [ 123454321 ],
46             )->on_done( sub {
47             my $isprime = shift;
48             print "123454321 " . ( $isprime ? "is" : "is not" ) . " a prime number\n";
49             })->on_fail( sub {
50             print STDERR "Cannot determine if it's prime - $_[0]\n";
51             })->get;
52              
53             =head1 DESCRIPTION
54              
55             This subclass of L wraps a function body in a collection
56             of worker processes, to allow it to execute independently of the main process.
57             The object acts as a proxy to the function, allowing invocations to be made by
58             passing in arguments, and invoking a continuation in the main process when the
59             function returns.
60              
61             The object represents the function code itself, rather than one specific
62             invocation of it. It can be called multiple times, by the C method.
63             Multiple outstanding invocations can be called; they will be dispatched in
64             the order they were queued. If only one worker process is used then results
65             will be returned in the order they were called. If multiple are used, then
66             each request will be sent in the order called, but timing differences between
67             each worker may mean results are returned in a different order.
68              
69             Since the code block will be called multiple times within the same child
70             process, it must take care not to modify any of its state that might affect
71             subsequent calls. Since it executes in a child process, it cannot make any
72             modifications to the state of the parent program. Therefore, all the data
73             required to perform its task must be represented in the call arguments, and
74             all of the result must be represented in the return values.
75              
76             The Function object is implemented using an L with two
77             L objects to pass calls into and results out from it.
78              
79             The L framework generally provides mechanisms for multiplexing IO
80             tasks between different handles, so there aren't many occasions when such an
81             asynchronous function is necessary. Two cases where this does become useful
82             are:
83              
84             =over 4
85              
86             =item 1.
87              
88             When a large amount of computationally-intensive work needs to be performed
89             (for example, the C test in the example in the C).
90              
91             =item 2.
92              
93             When a blocking OS syscall or library-level function needs to be called, and
94             no nonblocking or asynchronous version is supplied. This is used by
95             L.
96              
97             =back
98              
99             This object is ideal for representing "pure" functions; that is, blocks of
100             code which have no stateful effect on the process, and whose result depends
101             only on the arguments passed in. For a more general co-routine ability, see
102             also L.
103              
104             =cut
105              
106             =head1 PARAMETERS
107              
108             The following named parameters may be passed to C or C:
109              
110             =head2 code => CODE
111              
112             The body of the function to execute.
113              
114             @result = $code->( @args )
115              
116             =head2 init_code => CODE
117              
118             Optional. If defined, this is invoked exactly once in every child process or
119             thread, after it is created, but before the first invocation of the function
120             body itself.
121              
122             $init_code->()
123              
124             =head2 module => STRING
125              
126             =head2 func => STRING
127              
128             An alternative to the C argument, which names a module to load and a
129             function to call within it. C should give a perl module name (i.e.
130             C, not a filename like F), and C should give
131             the basename of a function within that module (i.e. without the module name
132             prefixed). It will be invoked, without extra arguments, as the main code
133             body of the object.
134              
135             The task of loading this module and resolving the resulting function from it
136             is only performed on the remote worker side, so the controlling process will
137             not need to actually load the module.
138              
139             =head2 init_func => STRING or ARRAY [ STRING, ... ]
140              
141             Optional addition to the C and C alternatives. Names a function
142             within the module to call each time a new worker is created.
143              
144             If this value is an array reference, its first element must be a string giving
145             the name of the function; the remaining values are passed to that function as
146             arguments.
147              
148             =head2 model => "fork" | "thread" | "spawn"
149              
150             Optional. Requests a specific L model. If not supplied,
151             leaves the default choice up to Routine.
152              
153             =head2 min_workers => INT
154              
155             =head2 max_workers => INT
156              
157             The lower and upper bounds of worker processes to try to keep running. The
158             actual number running at any time will be kept somewhere between these bounds
159             according to load.
160              
161             =head2 max_worker_calls => INT
162              
163             Optional. If provided, stop a worker process after it has processed this
164             number of calls. (New workers may be started to replace stopped ones, within
165             the bounds given above).
166              
167             =head2 idle_timeout => NUM
168              
169             Optional. If provided, idle worker processes will be shut down after this
170             amount of time, if there are more than C of them.
171              
172             =head2 exit_on_die => BOOL
173              
174             Optional boolean, controls what happens after the C throws an
175             exception. If missing or false, the worker will continue running to process
176             more requests. If true, the worker will be shut down. A new worker might be
177             constructed by the C method to replace it, if necessary.
178              
179             =head2 setup => ARRAY
180              
181             Optional array reference. Specifies the C key to pass to the underlying
182             L when setting up new worker processes.
183              
184             =cut
185              
186             sub _init
187             {
188 49     49   161 my $self = shift;
189 49         449 $self->SUPER::_init( @_ );
190              
191 49         200 $self->{min_workers} = 1;
192 49         109 $self->{max_workers} = 8;
193              
194 49         123 $self->{workers} = {}; # {$id} => IaFunction:Worker
195              
196 49         172 $self->{pending_queue} = [];
197             }
198              
199             sub configure
200             {
201 49     49 1 103 my $self = shift;
202 49         484 my %params = @_;
203              
204 49         137 my %worker_params;
205 49         132 foreach (qw( model exit_on_die max_worker_calls )) {
206 147 100       454 $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
207             }
208              
209 49 100       188 if( keys %worker_params ) {
210 14         102 foreach my $worker ( $self->_worker_objects ) {
211 0         0 $worker->configure( %worker_params );
212             }
213             }
214              
215 49 100       181 if( exists $params{idle_timeout} ) {
216 7         29 my $timeout = delete $params{idle_timeout};
217 7 50       59 if( !$timeout ) {
    50          
218 0 0       0 $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
219             }
220             elsif( my $idle_timer = $self->{idle_timer} ) {
221 0         0 $idle_timer->configure( delay => $timeout );
222             }
223             else {
224             $self->{idle_timer} = IO::Async::Timer::Countdown->new(
225             delay => $timeout,
226             on_expire => $self->_capture_weakself( sub {
227 1 50   1   6 my $self = shift or return;
228 1         6 my $workers = $self->{workers};
229              
230             # Shut down atmost one idle worker, starting from the highest
231             # ID. Since we search from lowest to assign work, this tries
232             # to ensure we'll shut down the least useful ones first,
233             # keeping more useful ones in memory (page/cache warmth, etc..)
234 1         10 foreach my $id ( reverse sort keys %$workers ) {
235 1 50       10 next if $workers->{$id}{busy};
236              
237 1         9 $workers->{$id}->stop;
238 1         7 last;
239             }
240              
241             # Still more?
242 1 50       10 $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
243 7         115 } ),
244             );
245 7         46 $self->add_child( $self->{idle_timer} );
246             }
247             }
248              
249 49         131 foreach (qw( min_workers max_workers )) {
250 98 100       313 $self->{$_} = delete $params{$_} if exists $params{$_};
251             # TODO: something about retuning
252             }
253              
254 49         102 my $need_restart;
255              
256 49         153 foreach (qw( init_code code module init_func func setup )) {
257 294 100       800 $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
258             }
259              
260             defined $self->{code} and defined $self->{func} and
261 49 50 66     640 croak "Cannot ->configure both 'code' and 'func'";
262             defined $self->{func} and !defined $self->{module} and
263 49 50 66     351 croak "'func' parameter requires a 'module' as well";
264              
265 49         297 $self->SUPER::configure( %params );
266              
267 49 50 33     455 if( $need_restart and $self->loop ) {
268 0         0 $self->stop;
269 0         0 $self->start;
270             }
271             }
272              
273             sub _add_to_loop
274             {
275 49     49   101 my $self = shift;
276 49         249 $self->SUPER::_add_to_loop( @_ );
277              
278 49         189 $self->start;
279             }
280              
281             sub _remove_from_loop
282             {
283 37     37   77 my $self = shift;
284              
285 37         140 $self->stop;
286              
287 37         201 $self->SUPER::_remove_from_loop( @_ );
288             }
289              
290             =head1 METHODS
291              
292             The following methods documented with a trailing call to C<< ->get >> return
293             L instances.
294              
295             =cut
296              
297             =head2 start
298              
299             $function->start
300              
301             Start the worker processes
302              
303             =cut
304              
305             sub start
306             {
307 51     51 1 105 my $self = shift;
308              
309 51         276 $self->_new_worker for 1 .. $self->{min_workers};
310             }
311              
312             =head2 stop
313              
314             $function->stop
315              
316             Stop the worker processes
317              
318             $f = $function->stop
319              
320             I
321              
322             If called in non-void context, returns a L instance that
323             will complete once every worker process has stopped and exited. This may be
324             useful for waiting until all of the processes are waited on, or other
325             edge-cases, but is not otherwise particularly useful.
326              
327             =cut
328              
329             sub stop
330             {
331 43     43 1 79 my $self = shift;
332              
333 43         442 $self->{stopping} = 1;
334              
335 43         104 my @f;
336              
337 43         141 foreach my $worker ( $self->_worker_objects ) {
338 38 100       241 defined wantarray ? push @f, $worker->stop : $worker->stop;
339             }
340              
341 43 100       1634 return Future->needs_all( @f ) if defined wantarray;
342             }
343              
344             =head2 restart
345              
346             $function->restart
347              
348             Gracefully stop and restart all the worker processes.
349              
350             =cut
351              
352             sub restart
353             {
354 2     2 1 1882 my $self = shift;
355              
356 2         32 $self->stop;
357 2         22 $self->start;
358             }
359              
360             =head2 call
361              
362             @result = $function->call( %params )->get
363              
364             Schedules an invocation of the contained function to be executed on one of the
365             worker processes. If a non-busy worker is available now, it will be called
366             immediately. If not, it will be queued and sent to the next free worker that
367             becomes available.
368              
369             The request will already have been serialised by the marshaller, so it will be
370             safe to modify any referenced data structures in the arguments after this call
371             returns.
372              
373             The C<%params> hash takes the following keys:
374              
375             =over 8
376              
377             =item args => ARRAY
378              
379             A reference to the array of arguments to pass to the code.
380              
381             =item priority => NUM
382              
383             Optional. Defines the sorting order when no workers are available and calls
384             must be queued for later. A default of zero will apply if not provided.
385              
386             Higher values cause the call to be considered more important, and will be
387             placed earlier in the queue than calls with a smaller value. Calls of equal
388             priority are still handled in FIFO order.
389              
390             =back
391              
392             If the function body returns normally the list of results are provided as the
393             (successful) result of returned future. If the function throws an exception
394             this results in a failed future. In the special case that the exception is in
395             fact an unblessed C reference, this array is unpacked and used as-is
396             for the C result. If the exception is not such a reference, it is used
397             as the first argument to C, in the category of C.
398              
399             $f->done( @result )
400              
401             $f->fail( @{ $exception } )
402             $f->fail( $exception, error => )
403              
404             =head2 call (void)
405              
406             $function->call( %params )
407              
408             When not returning a future, the C, C and C
409             arguments give continuations to handle successful results or failure.
410              
411             =over 8
412              
413             =item on_result => CODE
414              
415             A continuation that is invoked when the code has been executed. If the code
416             returned normally, it is called as:
417              
418             $on_result->( 'return', @values )
419              
420             If the code threw an exception, or some other error occurred such as a closed
421             connection or the process died, it is called as:
422              
423             $on_result->( 'error', $exception_name )
424              
425             =item on_return => CODE and on_error => CODE
426              
427             An alternative to C. Two continuations to use in either of the
428             circumstances given above. They will be called directly, without the leading
429             'return' or 'error' value.
430              
431             =back
432              
433             =cut
434              
435             sub debug_printf_call
436             {
437 75     75 0 139 my $self = shift;
438 75         666 $self->debug_printf( "CALL" );
439             }
440              
441             sub debug_printf_result
442             {
443 52     52 0 118 my $self = shift;
444 52         179 $self->debug_printf( "RESULT" );
445             }
446              
447             sub debug_printf_failure
448             {
449 21     21 0 73 my $self = shift;
450 21         63 my ( $err ) = @_;
451 21         140 $self->debug_printf( "FAIL $err" );
452             }
453              
454             sub call
455             {
456 90     90 1 7978 my $self = shift;
457 90         1065 my %params = @_;
458              
459             # TODO: possibly just queue this?
460 90 50       386 $self->loop or croak "Cannot ->call on a Function not yet in a Loop";
461              
462 90         232 my $args = delete $params{args};
463 90 50       457 ref $args eq "ARRAY" or croak "Expected 'args' to be an array";
464              
465 90         183 my ( $on_done, $on_fail );
466 90 100 66     784 if( defined $params{on_result} ) {
    100          
    50          
467 2         6 my $on_result = delete $params{on_result};
468 2 50       10 ref $on_result or croak "Expected 'on_result' to be a reference";
469              
470             $on_done = sub {
471 2     2   56 $on_result->( return => @_ );
472 2         21 };
473             $on_fail = sub {
474 0     0   0 my ( $err, @values ) = @_;
475 0         0 $on_result->( error => @values );
476 2         24 };
477             }
478             elsif( defined $params{on_return} and defined $params{on_error} ) {
479 44         175 my $on_return = delete $params{on_return};
480 44 50       192 ref $on_return or croak "Expected 'on_return' to be a reference";
481 44         136 my $on_error = delete $params{on_error};
482 44 50       293 ref $on_error or croak "Expected 'on_error' to be a reference";
483              
484 44         116 $on_done = $on_return;
485 44         102 $on_fail = $on_error;
486             }
487             elsif( !defined wantarray ) {
488 0         0 croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future";
489             }
490              
491 90         406 $self->debug_printf_call( @$args );
492              
493 90         1627 my $request = IO::Async::Channel->encode( $args );
494              
495 90         205 my $future;
496 90 100       283 if( my $worker = $self->_get_worker ) {
497 66         695 $future = $self->_call_worker( $worker, $request );
498             }
499             else {
500 23         114 $self->debug_printf( "QUEUE" );
501 23         79 my $queue = $self->{pending_queue};
502              
503             my $next = Pending(
504 23   100     235 my $priority = $params{priority} || 0,
505             my $wait_f = $self->loop->new_future,
506             );
507              
508 23 100       502 if( $priority ) {
509 9     12   189 my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue;
  12         126  
510 9   33     129 splice @$queue, $idx // $#$queue+1, 0, ( $next );
511             }
512             else {
513 14         68 push @$queue, $next;
514             }
515              
516             $future = $wait_f->then( sub {
517 22     22   1823 my ( $self, $worker ) = @_;
518 22         90 $self->_call_worker( $worker, $request );
519 23         218 });
520             }
521              
522             $future->on_done( $self->_capture_weakself( sub {
523 66 50   66   190 my $self = shift or return;
524 66         216 $self->debug_printf_result( @_ );
525 89         2848 }));
526             $future->on_fail( $self->_capture_weakself( sub {
527 21 50   21   121 my $self = shift or return;
528 21         158 $self->debug_printf_failure( @_ );
529 89         2407 }));
530              
531 89 100       2936 $future->on_done( $on_done ) if $on_done;
532 89 100       1093 $future->on_fail( $on_fail ) if $on_fail;
533              
534 89 100       1634 return $future if defined wantarray;
535              
536             # Caller is not going to keep hold of the Future, so we have to ensure it
537             # stays alive somehow
538 36     14   543 $self->adopt_future( $future->else( sub { Future->done } ) );
  14         737  
539             }
540              
541             sub _worker_objects
542             {
543 230     230   341 my $self = shift;
544 230         336 return values %{ $self->{workers} };
  230         1112  
545             }
546              
547             =head2 workers
548              
549             $count = $function->workers
550              
551             Returns the total number of worker processes available
552              
553             =cut
554              
555             sub workers
556             {
557 53     53 1 8689 my $self = shift;
558 53         122 return scalar keys %{ $self->{workers} };
  53         597  
559             }
560              
561             =head2 workers_busy
562              
563             $count = $function->workers_busy
564              
565             Returns the number of worker processes that are currently busy
566              
567             =cut
568              
569             sub workers_busy
570             {
571 9     9 1 4797 my $self = shift;
572 9         24 return scalar grep { $_->{busy} } $self->_worker_objects;
  9         48  
573             }
574              
575             =head2 workers_idle
576              
577             $count = $function->workers_idle
578              
579             Returns the number of worker processes that are currently idle
580              
581             =cut
582              
583             sub workers_idle
584             {
585 164     164 1 309 my $self = shift;
586 164         498 return scalar grep { !$_->{busy} } $self->_worker_objects;
  169         944  
587             }
588              
589             sub _new_worker
590             {
591 55     55   130 my $self = shift;
592              
593             my $worker = IO::Async::Function::Worker->new(
594 440         2045 ( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
595             max_calls => $self->{max_worker_calls},
596              
597             on_finish => $self->_capture_weakself( sub {
598 3 50   3   21 my $self = shift or return;
599 3         9 my ( $worker ) = @_;
600              
601 3 50       45 return if $self->{stopping};
602              
603 0 0       0 $self->_new_worker if $self->workers < $self->{min_workers};
604              
605 0         0 $self->_dispatch_pending;
606 55         158 } ),
607             );
608              
609 55         409 $self->add_child( $worker );
610              
611 53         1131 return $self->{workers}{$worker->id} = $worker;
612             }
613              
614             sub _get_worker
615             {
616 113     113   207 my $self = shift;
617              
618 113         161 foreach ( sort keys %{ $self->{workers} } ) {
  113         558  
619 104 100       533 return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
620             }
621              
622 35 100       193 if( $self->workers < $self->{max_workers} ) {
623 12         76 return $self->_new_worker;
624             }
625              
626 23         164 return undef;
627             }
628              
629             sub _call_worker
630             {
631 88     88   209 my $self = shift;
632 88         352 my ( $worker, $type, $args ) = @_;
633              
634 88         655 my $future = $worker->call( $type, $args );
635              
636 88 100       2332 if( $self->workers_idle == 0 ) {
637 86 100       469 $self->{idle_timer}->stop if $self->{idle_timer};
638             }
639              
640 88         231 return $future;
641             }
642              
643             sub _dispatch_pending
644             {
645 88     88   219 my $self = shift;
646              
647 88         202 while( my $next = shift @{ $self->{pending_queue} } ) {
  89         665  
648 23 50       200 my $worker = $self->_get_worker or return;
649              
650 23         125 my $f = $next->f;
651              
652 23 100       233 next if $f->is_cancelled;
653              
654 22         378 $self->debug_printf( "UNQUEUE" );
655 22         200 $f->done( $self, $worker );
656 22         1087 return;
657             }
658              
659 66 100       207 if( $self->workers_idle > $self->{min_workers} ) {
660 19 100 66     206 $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
661             }
662             }
663              
664             package # hide from indexer
665             IO::Async::Function::Worker;
666              
667 10     10   22061 use base qw( IO::Async::Routine );
  10         23  
  10         6191  
668              
669 10     10   77 use Carp;
  10         21  
  10         552  
670              
671 10     10   5920 use IO::Async::Channel;
  10         940  
  10         349  
672              
673 10     10   6815 use IO::Async::Internals::FunctionWorker;
  10         27  
  10         8235  
674              
675             sub new
676             {
677 55     55   242 my $class = shift;
678 55         767 my %params = @_;
679              
680 55         382 my $arg_channel = IO::Async::Channel->new;
681 55         168 my $ret_channel = IO::Async::Channel->new;
682              
683 55         91 my $send_initial;
684              
685 55 100       170 if( defined( my $code = $params{code} ) ) {
    50          
686 49         105 my $init_code = $params{init_code};
687              
688             $params{code} = sub {
689 1 50   1   12 $init_code->() if defined $init_code;
690              
691 1         29 IO::Async::Internals::FunctionWorker::runloop( $code, $arg_channel, $ret_channel );
692 49         646 };
693             }
694             elsif( defined( my $func = $params{func} ) ) {
695 6         38 my $module = $params{module};
696 6         39 my $init_func = $params{init_func};
697 6         29 my @init_args;
698              
699 6         68 $params{module} = "IO::Async::Internals::FunctionWorker";
700 6         38 $params{func} = "run_worker";
701              
702 6 50       31 ( $init_func, @init_args ) = @$init_func if ref( $init_func ) eq "ARRAY";
703              
704 6         42 $send_initial = [ $module, $func, $init_func, @init_args ];
705             }
706              
707 55         255 delete @params{qw( init_code init_func )};
708              
709 55         443 my $worker = $class->SUPER::new(
710             %params,
711             channels_in => [ $arg_channel ],
712             channels_out => [ $ret_channel ],
713             );
714              
715 55         151 $worker->{arg_channel} = $arg_channel;
716 55         102 $worker->{ret_channel} = $ret_channel;
717              
718 55 100       117 $worker->{send_initial} = $send_initial if $send_initial;
719              
720 55         185 return $worker;
721             }
722              
723             sub _add_to_loop
724             {
725 55     55   625 my $self = shift;
726 55         267 $self->SUPER::_add_to_loop( @_ );
727              
728 53 100       5951 $self->{arg_channel}->send( delete $self->{send_initial} ) if $self->{send_initial};
729             }
730              
731             sub configure
732             {
733 55     55   259 my $self = shift;
734 55         297 my %params = @_;
735              
736 55   33     461 exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );
737              
738 55         418 $self->SUPER::configure( %params );
739             }
740              
741             sub stop
742             {
743 48     48   104 my $worker = shift;
744 48         286 $worker->{arg_channel}->close;
745              
746 48         83 my $ret;
747 48 100       194 $ret = $worker->result_future if defined wantarray;
748              
749 48 50       192 if( my $function = $worker->parent ) {
750 48         207 delete $function->{workers}{$worker->id};
751              
752 48 100       173 if( $worker->{busy} ) {
753 10         123 $worker->{remove_on_idle}++;
754             }
755             else {
756 38         123 $function->remove_child( $worker );
757             }
758             }
759              
760 48         258 return $ret;
761             }
762              
763             sub call
764             {
765 88     88   201 my $worker = shift;
766 88         189 my ( $args ) = @_;
767              
768 88         506 $worker->{arg_channel}->send_encoded( $args );
769              
770 88         659 $worker->{busy} = 1;
771 88         257 $worker->{max_calls}--;
772              
773             return $worker->{ret_channel}->recv->then(
774             # on recv
775             $worker->_capture_weakself( sub {
776 85     85   184 my ( $worker, $result ) = @_;
777 85         255 my ( $type, @values ) = @$result;
778              
779             $worker->stop if !$worker->{max_calls} or
780 85 100 66     545 $worker->{exit_on_die} && $type eq "e";
      100        
781              
782 85 100       248 if( $type eq "r" ) {
    50          
783 66         439 return Future->done( @values );
784             }
785             elsif( $type eq "e" ) {
786 19         166 return Future->fail( @values );
787             }
788             else {
789 0         0 die "Unrecognised type from worker - $type\n";
790             }
791             } ),
792             # on EOF
793             $worker->_capture_weakself( sub {
794 2     2   8 my ( $worker ) = @_;
795              
796 2         28 $worker->stop;
797              
798 2         26 return Future->fail( "closed", "closed" );
799             } )
800             )->on_ready( $worker->_capture_weakself( sub {
801 88     88   202 my ( $worker, $f ) = @_;
802 88         181 $worker->{busy} = 0;
803              
804 88         333 my $function = $worker->parent;
805 88 50       876 $function->_dispatch_pending if $function;
806              
807 88 100 66     827 $function->remove_child( $worker ) if $function and $worker->{remove_on_idle};
808 88         354 }));
809             }
810              
811             =head1 EXAMPLES
812              
813             =head2 Extended Error Information on Failure
814              
815             The array-unpacking form of exception indiciation allows the function body to
816             more precicely control the resulting failure from the C future.
817              
818             my $divider = IO::Async::Function->new(
819             code => sub {
820             my ( $numerator, $divisor ) = @_;
821             $divisor == 0 and
822             die [ "Cannot divide by zero", div_zero => $numerator, $divisor ];
823              
824             return $numerator / $divisor;
825             }
826             );
827              
828             =head1 NOTES
829              
830             For the record, 123454321 is 11111 * 11111, a square number, and therefore not
831             prime.
832              
833             =head1 AUTHOR
834              
835             Paul Evans
836              
837             =cut
838              
839             0x55AA;