File Coverage

blib/lib/IO/Async/Function.pm
Criterion Covered Total %
statement 240 251 95.6
branch 83 112 74.1
condition 20 32 62.5
subroutine 47 48 97.9
pod 8 11 72.7
total 398 454 87.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Function;
7              
8 10     10   342254 use strict;
  10         29  
  10         299  
9 10     10   51 use warnings;
  10         20  
  10         484  
10              
11             our $VERSION = '0.801';
12              
13 10     10   67 use base qw( IO::Async::Notifier );
  10         20  
  10         2633  
14 10     10   4876 use IO::Async::Timer::Countdown;
  10         26  
  10         296  
15              
16 10     10   62 use Carp;
  10         17  
  10         577  
17              
18 10     10   77 use List::Util qw( first );
  10         19  
  10         821  
19              
20 10     10   3494 use Struct::Dumb qw( readonly_struct );
  10         14783  
  10         63  
21              
22             readonly_struct Pending => [qw( priority f )];
23              
24             =head1 NAME
25              
26             C - call a function asynchronously
27              
28             =head1 SYNOPSIS
29              
30             use IO::Async::Function;
31              
32             use IO::Async::Loop;
33             my $loop = IO::Async::Loop->new;
34              
35             my $function = IO::Async::Function->new(
36             code => sub {
37             my ( $number ) = @_;
38             return is_prime( $number );
39             },
40             );
41              
42             $loop->add( $function );
43              
44             $function->call(
45             args => [ 123454321 ],
46             )->on_done( sub {
47             my $isprime = shift;
48             print "123454321 " . ( $isprime ? "is" : "is not" ) . " a prime number\n";
49             })->on_fail( sub {
50             print STDERR "Cannot determine if it's prime - $_[0]\n";
51             })->get;
52              
53             =head1 DESCRIPTION
54              
55             This subclass of L wraps a function body in a collection
56             of worker processes, to allow it to execute independently of the main process.
57             The object acts as a proxy to the function, allowing invocations to be made by
58             passing in arguments, and invoking a continuation in the main process when the
59             function returns.
60              
61             The object represents the function code itself, rather than one specific
62             invocation of it. It can be called multiple times, by the C method.
63             Multiple outstanding invocations can be called; they will be dispatched in
64             the order they were queued. If only one worker process is used then results
65             will be returned in the order they were called. If multiple are used, then
66             each request will be sent in the order called, but timing differences between
67             each worker may mean results are returned in a different order.
68              
69             Since the code block will be called multiple times within the same child
70             process, it must take care not to modify any of its state that might affect
71             subsequent calls. Since it executes in a child process, it cannot make any
72             modifications to the state of the parent program. Therefore, all the data
73             required to perform its task must be represented in the call arguments, and
74             all of the result must be represented in the return values.
75              
76             The Function object is implemented using an L with two
77             L objects to pass calls into and results out from it.
78              
79             The L framework generally provides mechanisms for multiplexing IO
80             tasks between different handles, so there aren't many occasions when such an
81             asynchronous function is necessary. Two cases where this does become useful
82             are:
83              
84             =over 4
85              
86             =item 1.
87              
88             When a large amount of computationally-intensive work needs to be performed
89             (for example, the C test in the example in the C).
90              
91             =item 2.
92              
93             When a blocking OS syscall or library-level function needs to be called, and
94             no nonblocking or asynchronous version is supplied. This is used by
95             L.
96              
97             =back
98              
99             This object is ideal for representing "pure" functions; that is, blocks of
100             code which have no stateful effect on the process, and whose result depends
101             only on the arguments passed in. For a more general co-routine ability, see
102             also L.
103              
104             =cut
105              
106             =head1 PARAMETERS
107              
108             The following named parameters may be passed to C or C:
109              
110             =head2 code => CODE
111              
112             The body of the function to execute.
113              
114             @result = $code->( @args )
115              
116             =head2 init_code => CODE
117              
118             Optional. If defined, this is invoked exactly once in every child process or
119             thread, after it is created, but before the first invocation of the function
120             body itself.
121              
122             $init_code->()
123              
124             =head2 module => STRING
125              
126             =head2 func => STRING
127              
128             I
129              
130             An alternative to the C argument, which names a module to load and a
131             function to call within it. C should give a perl module name (i.e.
132             C, not a filename like F), and C should give
133             the basename of a function within that module (i.e. without the module name
134             prefixed). It will be invoked, without extra arguments, as the main code
135             body of the object.
136              
137             The task of loading this module and resolving the resulting function from it
138             is only performed on the remote worker side, so the controlling process will
139             not need to actually load the module.
140              
141             =head2 init_func => STRING or ARRAY [ STRING, ... ]
142              
143             Optional addition to the C and C alternatives. Names a function
144             within the module to call each time a new worker is created.
145              
146             If this value is an array reference, its first element must be a string giving
147             the name of the function; the remaining values are passed to that function as
148             arguments.
149              
150             =head2 model => "fork" | "thread" | "spawn"
151              
152             Optional. Requests a specific L model. If not supplied,
153             leaves the default choice up to Routine.
154              
155             =head2 min_workers => INT
156              
157             =head2 max_workers => INT
158              
159             The lower and upper bounds of worker processes to try to keep running. The
160             actual number running at any time will be kept somewhere between these bounds
161             according to load.
162              
163             =head2 max_worker_calls => INT
164              
165             Optional. If provided, stop a worker process after it has processed this
166             number of calls. (New workers may be started to replace stopped ones, within
167             the bounds given above).
168              
169             =head2 idle_timeout => NUM
170              
171             Optional. If provided, idle worker processes will be shut down after this
172             amount of time, if there are more than C of them.
173              
174             =head2 exit_on_die => BOOL
175              
176             Optional boolean, controls what happens after the C throws an
177             exception. If missing or false, the worker will continue running to process
178             more requests. If true, the worker will be shut down. A new worker might be
179             constructed by the C method to replace it, if necessary.
180              
181             =head2 setup => ARRAY
182              
183             Optional array reference. Specifies the C key to pass to the underlying
184             L when setting up new worker processes.
185              
186             =cut
187              
188             sub _init
189             {
190 49     49   129 my $self = shift;
191 49         358 $self->SUPER::_init( @_ );
192              
193 49         194 $self->{min_workers} = 1;
194 49         100 $self->{max_workers} = 8;
195              
196 49         135 $self->{workers} = {}; # {$id} => IaFunction:Worker
197              
198 49         153 $self->{pending_queue} = [];
199             }
200              
201             sub configure
202             {
203 49     49 1 252 my $self = shift;
204 49         233 my %params = @_;
205              
206 49         93 my %worker_params;
207 49         133 foreach (qw( model exit_on_die max_worker_calls )) {
208 147 100       442 $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
209             }
210              
211 49 100       158 if( keys %worker_params ) {
212 14         100 foreach my $worker ( $self->_worker_objects ) {
213 0         0 $worker->configure( %worker_params );
214             }
215             }
216              
217 49 100       156 if( exists $params{idle_timeout} ) {
218 7         30 my $timeout = delete $params{idle_timeout};
219 7 50       71 if( !$timeout ) {
    50          
220 0 0       0 $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
221             }
222             elsif( my $idle_timer = $self->{idle_timer} ) {
223 0         0 $idle_timer->configure( delay => $timeout );
224             }
225             else {
226             $self->{idle_timer} = IO::Async::Timer::Countdown->new(
227             delay => $timeout,
228             on_expire => $self->_capture_weakself( sub {
229 1 50   1   6 my $self = shift or return;
230 1         4 my $workers = $self->{workers};
231              
232             # Shut down atmost one idle worker, starting from the highest
233             # ID. Since we search from lowest to assign work, this tries
234             # to ensure we'll shut down the least useful ones first,
235             # keeping more useful ones in memory (page/cache warmth, etc..)
236 1         8 foreach my $id ( reverse sort keys %$workers ) {
237 1 50       6 next if $workers->{$id}{busy};
238              
239 1         9 $workers->{$id}->stop;
240 1         8 last;
241             }
242              
243             # Still more?
244 1 50       13 $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
245 7         97 } ),
246             );
247 7         55 $self->add_child( $self->{idle_timer} );
248             }
249             }
250              
251 49         114 foreach (qw( min_workers max_workers )) {
252 98 100       305 $self->{$_} = delete $params{$_} if exists $params{$_};
253             # TODO: something about retuning
254             }
255              
256 49         80 my $need_restart;
257              
258 49         118 foreach (qw( init_code code module init_func func setup )) {
259 294 100       677 $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
260             }
261              
262             defined $self->{code} and defined $self->{func} and
263 49 50 66     525 croak "Cannot ->configure both 'code' and 'func'";
264             defined $self->{func} and !defined $self->{module} and
265 49 50 66     361 croak "'func' parameter requires a 'module' as well";
266              
267 49         259 $self->SUPER::configure( %params );
268              
269 49 50 33     553 if( $need_restart and $self->loop ) {
270 0         0 $self->stop;
271 0         0 $self->start;
272             }
273             }
274              
275             sub _add_to_loop
276             {
277 49     49   96 my $self = shift;
278 49         200 $self->SUPER::_add_to_loop( @_ );
279              
280 49         176 $self->start;
281             }
282              
283             sub _remove_from_loop
284             {
285 37     37   80 my $self = shift;
286              
287 37         122 $self->stop;
288              
289 37         193 $self->SUPER::_remove_from_loop( @_ );
290             }
291              
292             =head1 METHODS
293              
294             The following methods documented with a trailing call to C<< ->get >> return
295             L instances.
296              
297             =cut
298              
299             =head2 start
300              
301             $function->start
302              
303             Start the worker processes
304              
305             =cut
306              
307             sub start
308             {
309 51     51 1 103 my $self = shift;
310              
311 51         235 $self->_new_worker for 1 .. $self->{min_workers};
312             }
313              
314             =head2 stop
315              
316             $function->stop
317              
318             Stop the worker processes
319              
320             $f = $function->stop
321              
322             I
323              
324             If called in non-void context, returns a L instance that
325             will complete once every worker process has stopped and exited. This may be
326             useful for waiting until all of the processes are waited on, or other
327             edge-cases, but is not otherwise particularly useful.
328              
329             =cut
330              
331             sub stop
332             {
333 43     43 1 131 my $self = shift;
334              
335 43         474 $self->{stopping} = 1;
336              
337 43         130 my @f;
338              
339 43         134 foreach my $worker ( $self->_worker_objects ) {
340 38 100       244 defined wantarray ? push @f, $worker->stop : $worker->stop;
341             }
342              
343 43 100       1359 return Future->needs_all( @f ) if defined wantarray;
344             }
345              
346             =head2 restart
347              
348             $function->restart
349              
350             Gracefully stop and restart all the worker processes.
351              
352             =cut
353              
354             sub restart
355             {
356 2     2 1 2158 my $self = shift;
357              
358 2         30 $self->stop;
359 2         31 $self->start;
360             }
361              
362             =head2 call
363              
364             @result = $function->call( %params )->get
365              
366             Schedules an invocation of the contained function to be executed on one of the
367             worker processes. If a non-busy worker is available now, it will be called
368             immediately. If not, it will be queued and sent to the next free worker that
369             becomes available.
370              
371             The request will already have been serialised by the marshaller, so it will be
372             safe to modify any referenced data structures in the arguments after this call
373             returns.
374              
375             The C<%params> hash takes the following keys:
376              
377             =over 8
378              
379             =item args => ARRAY
380              
381             A reference to the array of arguments to pass to the code.
382              
383             =item priority => NUM
384              
385             Optional. Defines the sorting order when no workers are available and calls
386             must be queued for later. A default of zero will apply if not provided.
387              
388             Higher values cause the call to be considered more important, and will be
389             placed earlier in the queue than calls with a smaller value. Calls of equal
390             priority are still handled in FIFO order.
391              
392             =back
393              
394             If the function body returns normally the list of results are provided as the
395             (successful) result of returned future. If the function throws an exception
396             this results in a failed future. In the special case that the exception is in
397             fact an unblessed C reference, this array is unpacked and used as-is
398             for the C result. If the exception is not such a reference, it is used
399             as the first argument to C, in the category of C.
400              
401             $f->done( @result )
402              
403             $f->fail( @{ $exception } )
404             $f->fail( $exception, error => )
405              
406             =head2 call (void)
407              
408             $function->call( %params )
409              
410             When not returning a future, the C, C and C
411             arguments give continuations to handle successful results or failure.
412              
413             =over 8
414              
415             =item on_result => CODE
416              
417             A continuation that is invoked when the code has been executed. If the code
418             returned normally, it is called as:
419              
420             $on_result->( 'return', @values )
421              
422             If the code threw an exception, or some other error occurred such as a closed
423             connection or the process died, it is called as:
424              
425             $on_result->( 'error', $exception_name )
426              
427             =item on_return => CODE and on_error => CODE
428              
429             An alternative to C. Two continuations to use in either of the
430             circumstances given above. They will be called directly, without the leading
431             'return' or 'error' value.
432              
433             =back
434              
435             =cut
436              
437             sub debug_printf_call
438             {
439 75     75 0 148 my $self = shift;
440 75         521 $self->debug_printf( "CALL" );
441             }
442              
443             sub debug_printf_result
444             {
445 52     52 0 112 my $self = shift;
446 52         209 $self->debug_printf( "RESULT" );
447             }
448              
449             sub debug_printf_failure
450             {
451 21     21 0 41 my $self = shift;
452 21         63 my ( $err ) = @_;
453 21         114 $self->debug_printf( "FAIL $err" );
454             }
455              
456             sub call
457             {
458 90     90 1 9055 my $self = shift;
459 90         995 my %params = @_;
460              
461             # TODO: possibly just queue this?
462 90 50       370 $self->loop or croak "Cannot ->call on a Function not yet in a Loop";
463              
464 90         217 my $args = delete $params{args};
465 90 50       599 ref $args eq "ARRAY" or croak "Expected 'args' to be an array";
466              
467 90         186 my ( $on_done, $on_fail );
468 90 100 66     698 if( defined $params{on_result} ) {
    100          
    50          
469 2         5 my $on_result = delete $params{on_result};
470 2 50       10 ref $on_result or croak "Expected 'on_result' to be a reference";
471              
472             $on_done = sub {
473 2     2   39 $on_result->( return => @_ );
474 2         24 };
475             $on_fail = sub {
476 0     0   0 my ( $err, @values ) = @_;
477 0         0 $on_result->( error => @values );
478 2         13 };
479             }
480             elsif( defined $params{on_return} and defined $params{on_error} ) {
481 44         139 my $on_return = delete $params{on_return};
482 44 50       274 ref $on_return or croak "Expected 'on_return' to be a reference";
483 44         115 my $on_error = delete $params{on_error};
484 44 50       320 ref $on_error or croak "Expected 'on_error' to be a reference";
485              
486 44         102 $on_done = $on_return;
487 44         113 $on_fail = $on_error;
488             }
489             elsif( !defined wantarray ) {
490 0         0 croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future";
491             }
492              
493 90         392 $self->debug_printf_call( @$args );
494              
495 90         1039 my $request = IO::Async::Channel->encode( $args );
496              
497 90         206 my $future;
498 90 100       236 if( my $worker = $self->_get_worker ) {
499 66         241 $future = $self->_call_worker( $worker, $request );
500             }
501             else {
502 23         90 $self->debug_printf( "QUEUE" );
503 23         69 my $queue = $self->{pending_queue};
504              
505             my $next = Pending(
506 23   100     273 my $priority = $params{priority} || 0,
507             my $wait_f = $self->loop->new_future,
508             );
509              
510 23 100       410 if( $priority ) {
511 9     12   135 my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue;
  12         96  
512 9   33     138 splice @$queue, $idx // $#$queue+1, 0, ( $next );
513             }
514             else {
515 14         87 push @$queue, $next;
516             }
517              
518             $future = $wait_f->then( sub {
519 22     22   1980 my ( $self, $worker ) = @_;
520 22         73 $self->_call_worker( $worker, $request );
521 23         158 });
522             }
523              
524             $future->on_done( $self->_capture_weakself( sub {
525 66 50   66   197 my $self = shift or return;
526 66         360 $self->debug_printf_result( @_ );
527 89         2107 }));
528             $future->on_fail( $self->_capture_weakself( sub {
529 21 50   21   115 my $self = shift or return;
530 21         155 $self->debug_printf_failure( @_ );
531 89         2781 }));
532              
533 89 100       2923 $future->on_done( $on_done ) if $on_done;
534 89 100       1121 $future->on_fail( $on_fail ) if $on_fail;
535              
536 89 100       1623 return $future if defined wantarray;
537              
538             # Caller is not going to keep hold of the Future, so we have to ensure it
539             # stays alive somehow
540 36     14   344 $self->adopt_future( $future->else( sub { Future->done } ) );
  14         715  
541             }
542              
543             sub _worker_objects
544             {
545 230     230   384 my $self = shift;
546 230         317 return values %{ $self->{workers} };
  230         1108  
547             }
548              
549             =head2 workers
550              
551             $count = $function->workers
552              
553             Returns the total number of worker processes available
554              
555             =cut
556              
557             sub workers
558             {
559 53     53 1 8327 my $self = shift;
560 53         73 return scalar keys %{ $self->{workers} };
  53         564  
561             }
562              
563             =head2 workers_busy
564              
565             $count = $function->workers_busy
566              
567             Returns the number of worker processes that are currently busy
568              
569             =cut
570              
571             sub workers_busy
572             {
573 9     9 1 4467 my $self = shift;
574 9         126 return scalar grep { $_->{busy} } $self->_worker_objects;
  9         75  
575             }
576              
577             =head2 workers_idle
578              
579             $count = $function->workers_idle
580              
581             Returns the number of worker processes that are currently idle
582              
583             =cut
584              
585             sub workers_idle
586             {
587 164     164 1 358 my $self = shift;
588 164         523 return scalar grep { !$_->{busy} } $self->_worker_objects;
  169         878  
589             }
590              
591             sub _new_worker
592             {
593 55     55   162 my $self = shift;
594              
595             my $worker = IO::Async::Function::Worker->new(
596 440         1839 ( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
597             max_calls => $self->{max_worker_calls},
598              
599             on_finish => $self->_capture_weakself( sub {
600 3 50   3   15 my $self = shift or return;
601 3         9 my ( $worker ) = @_;
602              
603 3 50       18 return if $self->{stopping};
604              
605 0 0       0 $self->_new_worker if $self->workers < $self->{min_workers};
606              
607 0         0 $self->_dispatch_pending;
608 55         157 } ),
609             );
610              
611 55         330 $self->add_child( $worker );
612              
613 53         1101 return $self->{workers}{$worker->id} = $worker;
614             }
615              
616             sub _get_worker
617             {
618 113     113   193 my $self = shift;
619              
620 113         179 foreach ( sort keys %{ $self->{workers} } ) {
  113         532  
621 104 100       492 return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
622             }
623              
624 35 100       175 if( $self->workers < $self->{max_workers} ) {
625 12         87 return $self->_new_worker;
626             }
627              
628 23         78 return undef;
629             }
630              
631             sub _call_worker
632             {
633 88     88   173 my $self = shift;
634 88         249 my ( $worker, $type, $args ) = @_;
635              
636 88         336 my $future = $worker->call( $type, $args );
637              
638 88 100       3037 if( $self->workers_idle == 0 ) {
639 86 100       621 $self->{idle_timer}->stop if $self->{idle_timer};
640             }
641              
642 88         294 return $future;
643             }
644              
645             sub _dispatch_pending
646             {
647 88     88   222 my $self = shift;
648              
649 88         133 while( my $next = shift @{ $self->{pending_queue} } ) {
  89         459  
650 23 50       175 my $worker = $self->_get_worker or return;
651              
652 23         111 my $f = $next->f;
653              
654 23 100       225 next if $f->is_cancelled;
655              
656 22         191 $self->debug_printf( "UNQUEUE" );
657 22         170 $f->done( $self, $worker );
658 22         1016 return;
659             }
660              
661 66 100       200 if( $self->workers_idle > $self->{min_workers} ) {
662 19 100 66     139 $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
663             }
664             }
665              
666             package # hide from indexer
667             IO::Async::Function::Worker;
668              
669 10     10   22203 use base qw( IO::Async::Routine );
  10         26  
  10         5985  
670              
671 10     10   75 use Carp;
  10         20  
  10         533  
672              
673 10     10   5641 use IO::Async::Channel;
  10         31  
  10         337  
674              
675 10     10   5451 use IO::Async::Internals::FunctionWorker;
  10         23  
  10         8541  
676              
677             sub new
678             {
679 55     55   232 my $class = shift;
680 55         691 my %params = @_;
681              
682 55         315 my $arg_channel = IO::Async::Channel->new;
683 55         160 my $ret_channel = IO::Async::Channel->new;
684              
685 55         82 my $send_initial;
686              
687 55 100       178 if( defined( my $code = $params{code} ) ) {
    50          
688 49         139 my $init_code = $params{init_code};
689              
690             $params{code} = sub {
691 1 50   1   53 $init_code->() if defined $init_code;
692              
693 1         33 IO::Async::Internals::FunctionWorker::runloop( $code, $arg_channel, $ret_channel );
694 49         344 };
695             }
696             elsif( defined( my $func = $params{func} ) ) {
697 6         27 my $module = $params{module};
698 6         26 my $init_func = $params{init_func};
699 6         21 my @init_args;
700              
701 6         31 $params{module} = "IO::Async::Internals::FunctionWorker";
702 6         32 $params{func} = "run_worker";
703              
704 6 50       32 ( $init_func, @init_args ) = @$init_func if ref( $init_func ) eq "ARRAY";
705              
706 6         28 $send_initial = [ $module, $func, $init_func, @init_args ];
707             }
708              
709 55         191 delete @params{qw( init_code init_func )};
710              
711 55         392 my $worker = $class->SUPER::new(
712             %params,
713             channels_in => [ $arg_channel ],
714             channels_out => [ $ret_channel ],
715             );
716              
717 55         160 $worker->{arg_channel} = $arg_channel;
718 55         112 $worker->{ret_channel} = $ret_channel;
719              
720 55 100       143 $worker->{send_initial} = $send_initial if $send_initial;
721              
722 55         213 return $worker;
723             }
724              
725             sub _add_to_loop
726             {
727 55     55   108 my $self = shift;
728 55         509 $self->SUPER::_add_to_loop( @_ );
729              
730 53 100       5656 $self->{arg_channel}->send( delete $self->{send_initial} ) if $self->{send_initial};
731             }
732              
733             sub configure
734             {
735 55     55   109 my $self = shift;
736 55         248 my %params = @_;
737              
738 55   33     466 exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );
739              
740 55         263 $self->SUPER::configure( %params );
741             }
742              
743             sub stop
744             {
745 48     48   120 my $worker = shift;
746 48         250 $worker->{arg_channel}->close;
747              
748 48         81 my $ret;
749 48 100       163 $ret = $worker->result_future if defined wantarray;
750              
751 48 50       129 if( my $function = $worker->parent ) {
752 48         243 delete $function->{workers}{$worker->id};
753              
754 48 100       125 if( $worker->{busy} ) {
755 10         131 $worker->{remove_on_idle}++;
756             }
757             else {
758 38         90 $function->remove_child( $worker );
759             }
760             }
761              
762 48         189 return $ret;
763             }
764              
765             sub call
766             {
767 88     88   153 my $worker = shift;
768 88         183 my ( $args ) = @_;
769              
770 88         495 $worker->{arg_channel}->send_encoded( $args );
771              
772 88         627 $worker->{busy} = 1;
773 88         234 $worker->{max_calls}--;
774              
775             return $worker->{ret_channel}->recv->then(
776             # on recv
777             $worker->_capture_weakself( sub {
778 85     85   207 my ( $worker, $result ) = @_;
779 85         258 my ( $type, @values ) = @$result;
780              
781             $worker->stop if !$worker->{max_calls} or
782 85 100 66     484 $worker->{exit_on_die} && $type eq "e";
      100        
783              
784 85 100       232 if( $type eq "r" ) {
    50          
785 66         395 return Future->done( @values );
786             }
787             elsif( $type eq "e" ) {
788 19         190 return Future->fail( @values );
789             }
790             else {
791 0         0 die "Unrecognised type from worker - $type\n";
792             }
793             } ),
794             # on EOF
795             $worker->_capture_weakself( sub {
796 2     2   34 my ( $worker ) = @_;
797              
798 2         26 $worker->stop;
799              
800 2         34 return Future->fail( "closed", "closed" );
801             } )
802             )->on_ready( $worker->_capture_weakself( sub {
803 88     88   208 my ( $worker, $f ) = @_;
804 88         197 $worker->{busy} = 0;
805              
806 88         377 my $function = $worker->parent;
807 88 50       856 $function->_dispatch_pending if $function;
808              
809 88 100 66     709 $function->remove_child( $worker ) if $function and $worker->{remove_on_idle};
810 88         516 }));
811             }
812              
813             =head1 EXAMPLES
814              
815             =head2 Extended Error Information on Failure
816              
817             The array-unpacking form of exception indiciation allows the function body to
818             more precicely control the resulting failure from the C future.
819              
820             my $divider = IO::Async::Function->new(
821             code => sub {
822             my ( $numerator, $divisor ) = @_;
823             $divisor == 0 and
824             die [ "Cannot divide by zero", div_zero => $numerator, $divisor ];
825              
826             return $numerator / $divisor;
827             }
828             );
829              
830             =head1 NOTES
831              
832             For the record, 123454321 is 11111 * 11111, a square number, and therefore not
833             prime.
834              
835             =head1 AUTHOR
836              
837             Paul Evans
838              
839             =cut
840              
841             0x55AA;