File Coverage

blib/lib/IO/Async/Routine.pm
Criterion Covered Total %
statement 141 191 73.8
branch 31 70 44.2
condition 15 26 57.6
subroutine 26 31 83.8
pod 5 5 100.0
total 218 323 67.4


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2012-2021 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Routine;
7              
8 12     12   173900 use strict;
  12         29  
  12         383  
9 12     12   69 use warnings;
  12         19  
  12         593  
10              
11             our $VERSION = '0.79';
12              
13 12     12   72 use base qw( IO::Async::Notifier );
  12         30  
  12         2212  
14              
15 12     12   98 use Carp;
  12         21  
  12         798  
16              
17 12     12   2570 use IO::Async::OS;
  12         23  
  12         366  
18 12     12   6951 use IO::Async::Process;
  12         32  
  12         385  
19              
20 12     12   1151 use Struct::Dumb qw( readonly_struct );
  12         4064  
  12         83  
21              
22             =head1 NAME
23              
24             C - execute code in an independent sub-process or thread
25              
26             =head1 SYNOPSIS
27              
28             use IO::Async::Routine;
29             use IO::Async::Channel;
30              
31             use IO::Async::Loop;
32             my $loop = IO::Async::Loop->new;
33              
34             my $nums_ch = IO::Async::Channel->new;
35             my $ret_ch = IO::Async::Channel->new;
36              
37             my $routine = IO::Async::Routine->new(
38             channels_in => [ $nums_ch ],
39             channels_out => [ $ret_ch ],
40              
41             code => sub {
42             my @nums = @{ $nums_ch->recv };
43             my $ret = 0; $ret += $_ for @nums;
44              
45             # Can only send references
46             $ret_ch->send( \$ret );
47             },
48              
49             on_finish => sub {
50             say "The routine aborted early - $_[-1]";
51             $loop->stop;
52             },
53             );
54              
55             $loop->add( $routine );
56              
57             $nums_ch->send( [ 10, 20, 30 ] );
58             $ret_ch->recv(
59             on_recv => sub {
60             my ( $ch, $totalref ) = @_;
61             say "The total of 10, 20, 30 is: $$totalref";
62             $loop->stop;
63             }
64             );
65              
66             $loop->run;
67              
68             =head1 DESCRIPTION
69              
70             This L contains a body of code and executes it in a
71             sub-process or thread, allowing it to act independently of the main program.
72             Once set up, all communication with the code happens by values passed into or
73             out of the Routine via L objects.
74              
75             The code contained within the Routine is free to make blocking calls without
76             stalling the rest of the program. This makes it useful for using existing code
77             which has no option not to block within an L-based program.
78              
79             To create asynchronous wrappers of functions that return a value based only on
80             their arguments, and do not generally maintain state within the process it may
81             be more convenient to use an L instead, which uses an
82             C to contain the body of the function and manages the
83             Channels itself.
84              
85             =head2 Models
86              
87             A choice of detachment model is available. Each has various advantages and
88             disadvantages. Not all of them may be available on a particular system.
89              
90             =head3 The C model
91              
92             The code in this model runs within its own process, created by calling
93             C from the main process. It is isolated from the rest of the program
94             in terms of memory, CPU time, and other resources. Because it is started
95             using C, the initial process state is a clone of the main process.
96              
97             This model performs well on UNIX-like operating systems which possess a true
98             native C system call, but is not available on C for example,
99             because the operating system does not provide full fork-like semantics.
100              
101             =head3 The C model
102              
103             The code in this model runs inside a separate thread within the main process.
104             It therefore shares memory and other resources such as open filehandles with
105             the main thread. As with the C model, the initial thread state is cloned
106             from the main controlling thread.
107              
108             This model is only available on perls built to support threading.
109              
110             =head3 The C model
111              
112             The code in this model runs within its own freshly-created process running
113             another copy of the perl interpreter. Similar to the C model it
114             therefore has its own memory, CPU time, and other resources. However, since it
115             is started freshly rather than by cloning the main process, it starts up in a
116             clean state, without any shared resources from its parent.
117              
118             Since this model creates a new fresh process rather than sharing existing
119             state, it cannot use the C argument to specify the routine body; it must
120             instead use only the C and C arguments.
121              
122             In the current implementation this model requires exactly one input channel
123             and exactly one output channel; both must be present, and there cannot be more
124             than one of either.
125              
126             This model performs well on both UNIX and Windows-like operating systems,
127             because it does not need full fork semantics.
128              
129             =cut
130              
131             =head1 EVENTS
132              
133             =head2 on_finish $exitcode
134              
135             For C-based Routines, this is invoked after the process has exited and
136             is passed the raw exitcode status.
137              
138             =head2 on_finish $type, @result
139              
140             For thread-based Routines, this is invoked after the thread has returned from
141             its code block and is passed the C result.
142              
143             As the behaviour of these events differs per model, it may be more convenient
144             to use C and C instead.
145              
146             =head2 on_return $result
147              
148             Invoked if the code block returns normally. Note that C-based Routines
149             can only transport an integer result between 0 and 255, as this is the actual
150             C value.
151              
152             =head2 on_die $exception
153              
154             Invoked if the code block fails with an exception.
155              
156             =cut
157              
158             =head1 PARAMETERS
159              
160             The following named parameters may be passed to C or C:
161              
162             =head2 model => "fork" | "thread" | "spawn"
163              
164             Optional. Defines how the routine will detach itself from the main process.
165             See the L section above for more detail.
166              
167             If the model is not specified, the environment variable
168             C is used to pick a default. If that isn't defined,
169             C is preferred if it is available, otherwise C.
170              
171             =head2 channels_in => ARRAY of IO::Async::Channel
172              
173             ARRAY reference of L objects to set up for passing values
174             in to the Routine.
175              
176             =head2 channels_out => ARRAY of IO::Async::Channel
177              
178             ARRAY reference of L objects to set up for passing values
179             out of the Routine.
180              
181             =head2 code => CODE
182              
183             CODE reference to the body of the Routine, to execute once the channels are
184             set up.
185              
186             When using the C model, this is not permitted; you must use C
187             and C instead.
188              
189             =head2 module => STRING
190              
191             =head2 func => STRING
192              
193             An alternative to the C argument, which names a module to load and a
194             function to call within it. C should give a perl module name (i.e.
195             C, not a filename like F), and C should give
196             the basename of a function within that module (i.e. without the module name
197             prefixed). It will be invoked as the main code body of the object, and passed
198             in a list of all the channels; first the input ones then the output ones.
199              
200             module::func( @channels_in, @channels_out )
201              
202             =head2 setup => ARRAY
203              
204             Optional. For C-based Routines, gives a reference to an array to pass
205             to the underlying C C method. Ignored for thread-based
206             Routines.
207              
208             =cut
209              
210 12 0       21354 use constant PREFERRED_MODEL =>
    50          
211             IO::Async::OS->HAVE_POSIX_FORK ? "fork" :
212             IO::Async::OS->HAVE_THREADS ? "thread" :
213 12     12   1367 die "No viable Routine models";
  12         28  
214              
215             sub _init
216             {
217 74     74   254 my $self = shift;
218 74         172 my ( $params ) = @_;
219              
220 74   50     1021 $params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL;
      66        
221              
222 74         353 $self->SUPER::_init( @_ );
223             }
224              
225             my %SETUP_CODE;
226              
227             sub configure
228             {
229 74     74 1 207 my $self = shift;
230 74         301 my %params = @_;
231              
232             # TODO: Can only reconfigure when not running
233 74         228 foreach (qw( channels_in channels_out code module func setup on_finish on_return on_die )) {
234 666 100       1721 $self->{$_} = delete $params{$_} if exists $params{$_};
235             }
236              
237             defined $self->{code} and defined $self->{func} and
238 74 50 66     827 croak "Cannot ->configure both 'code' and 'func'";
239             defined $self->{func} and !defined $self->{module} and
240 74 50 66     377 croak "'func' parameter requires a 'module' as well";
241              
242 74 50       389 if( defined( my $model = delete $params{model} ) ) {
243 74 50 66     551 ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
244             or die "Unrecognised Routine model $model";
245              
246             # TODO: optional plugin "configure" check here?
247 74 50 66     1336 $model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and
248             croak "Cannot use 'fork' model as fork() is not available";
249 74 50 33     278 $model eq "thread" and !IO::Async::OS->HAVE_THREADS and
250             croak "Cannot use 'thread' model as threads are not available";
251              
252 74         216 $self->{model} = $model;
253             }
254              
255 74         338 $self->SUPER::configure( %params );
256             }
257              
258             sub _add_to_loop
259             {
260 74     74   160 my $self = shift;
261 74         173 my ( $loop ) = @_;
262 74         313 $self->SUPER::_add_to_loop( $loop );
263              
264 74         178 my $model = $self->{model};
265              
266 74 50 33     311 my $code = ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
267             or die "Unrecognised Routine model $model";
268              
269 74         236 $self->$code();
270             }
271              
272             readonly_struct ChannelSetup => [qw( chan myfd otherfd )];
273              
274             sub _create_channels_in
275             {
276 74     74   200 my $self = shift;
277              
278 74         148 my @channels_in;
279              
280 74 100       139 foreach my $ch ( @{ $self->{channels_in} || [] } ) {
  74         690  
281 66         205 my ( $rd, $wr );
282 66 100       583 unless( $rd = $ch->_extract_read_handle ) {
283 65         786 ( $rd, $wr ) = IO::Async::OS->pipepair;
284             }
285 66         310 push @channels_in, ChannelSetup( $ch, $wr, $rd );
286             }
287              
288 74         927 return @channels_in;
289             }
290              
291             sub _create_channels_out
292             {
293 74     74   848 my $self = shift;
294              
295 74         159 my @channels_out;
296              
297 74 100       406 foreach my $ch ( @{ $self->{channels_out} || [] } ) {
  74         603  
298 67         168 my ( $rd, $wr );
299 67 50       311 unless( $wr = $ch->_extract_write_handle ) {
300 67         450 ( $rd, $wr ) = IO::Async::OS->pipepair;
301             }
302 67         300 push @channels_out, ChannelSetup( $ch, $rd, $wr );
303             }
304              
305 74         710 return @channels_out;
306             }
307              
308             sub _adopt_channels_in
309             {
310 74     74   151 my $self = shift;
311 74         167 my ( @channels_in ) = @_;
312              
313 74         185 foreach ( @channels_in ) {
314 66         229 my $ch = $_->chan;
315 66         477 $ch->setup_async_mode( write_handle => $_->myfd );
316 66 100       311 $self->add_child( $ch ) unless $ch->parent;
317             }
318             }
319              
320             sub _adopt_channels_out
321             {
322 74     74   150 my $self = shift;
323 74         158 my ( @channels_out ) = @_;
324              
325 74         197 foreach ( @channels_out ) {
326 67         207 my $ch = $_->chan;
327 67         480 $ch->setup_async_mode( read_handle => $_->myfd );
328 67 50       234 $self->add_child( $ch ) unless $ch->parent;
329             }
330             }
331              
332             sub _setup_fork
333             {
334 70     70   130 my $self = shift;
335              
336 70         536 my @channels_in = $self->_create_channels_in;
337 70         269 my @channels_out = $self->_create_channels_out;
338              
339 70         200 my $code = $self->{code};
340              
341 70         139 my $module = $self->{module};
342 70         113 my $func = $self->{func};
343              
344 70         164 my @setup = map { $_->otherfd => "keep" } @channels_in, @channels_out;
  125         721  
345              
346 70         508 my $setup = $self->{setup};
347 70 100       175 push @setup, @$setup if $setup;
348              
349             my $process = IO::Async::Process->new(
350             setup => \@setup,
351             code => sub {
352 1     1   6 foreach ( @channels_in, @channels_out ) {
353 2         236 $_->chan->setup_sync_mode( $_->otherfd );
354             }
355              
356 1 50       40 if( defined $module ) {
357 0         0 ( my $file = "$module.pm" ) =~ s{::}{/}g;
358 0         0 require $file;
359              
360 0 0       0 $code = $module->can( $func ) or
361             die "Module '$module' has no '$func'\n";
362             }
363              
364 1         3 my $ret = $code->( map { $_->chan } @channels_in, @channels_out );
  2         9  
365              
366 0         0 foreach ( @channels_in, @channels_out ) {
367 0         0 $_->chan->close;
368             }
369              
370 0         0 return $ret;
371             },
372             on_finish => $self->_replace_weakself( sub {
373 9 50   9   42 my $self = shift or return;
374 9         38 my ( $exitcode ) = @_;
375 9         80 $self->maybe_invoke_event( on_finish => $exitcode );
376              
377 9 50       57 unless( $exitcode & 0x7f ) {
378 9         46 $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
379 9         97 $self->result_future->done( $exitcode >> 8 );
380             }
381             }),
382             on_exception => $self->_replace_weakself( sub {
383 4 50   4   46 my $self = shift or return;
384 4         36 my ( $exception, $errno, $exitcode ) = @_;
385              
386 4         46 $self->maybe_invoke_event( on_die => $exception );
387 4         72 $self->result_future->fail( $exception, routine => );
388 70         1392 }),
389             );
390              
391 70         387 $self->_adopt_channels_in ( @channels_in );
392 70         505 $self->_adopt_channels_out( @channels_out );
393              
394 70         406 $self->add_child( $self->{process} = $process );
395 69         784 $self->{id} = "P" . $process->pid;
396              
397 69         1696 $_->otherfd->close for @channels_in, @channels_out;
398             }
399              
400             sub _setup_thread
401             {
402 0     0   0 my $self = shift;
403              
404 0         0 my @channels_in = $self->_create_channels_in;
405 0         0 my @channels_out = $self->_create_channels_out;
406              
407 0         0 my $code = $self->{code};
408              
409 0         0 my $module = $self->{module};
410 0         0 my $func = $self->{func};
411              
412             my $tid = $self->loop->create_thread(
413             code => sub {
414 0     0   0 foreach ( @channels_in, @channels_out ) {
415 0         0 $_->chan->setup_sync_mode( $_->otherfd );
416 0         0 $_->myfd->close;
417             }
418              
419 0 0       0 if( defined $func ) {
420 0         0 ( my $file = "$module.pm" ) =~ s{::}{/}g;
421 0         0 require $file;
422              
423 0 0       0 $code = $module->can( $func ) or
424             die "Module '$module' has no '$func'\n";
425             }
426              
427 0         0 my $ret = $code->( map { $_->chan } @channels_in, @channels_out );
  0         0  
428              
429 0         0 foreach ( @channels_in, @channels_out ) {
430 0         0 $_->chan->close;
431             }
432              
433 0         0 return $ret;
434             },
435             on_joined => $self->_capture_weakself( sub {
436 0 0   0   0 my $self = shift or return;
437 0         0 my ( $ev, @result ) = @_;
438 0         0 $self->maybe_invoke_event( on_finish => @_ );
439              
440 0 0       0 if( $ev eq "return" ) {
441 0         0 $self->maybe_invoke_event( on_return => @result );
442 0         0 $self->result_future->done( @result );
443             }
444 0 0       0 if( $ev eq "died" ) {
445 0         0 $self->maybe_invoke_event( on_die => $result[0] );
446 0         0 $self->result_future->fail( $result[0], routine => );
447             }
448              
449 0         0 delete $self->{tid};
450 0         0 }),
451             );
452              
453 0         0 $self->{tid} = $tid;
454 0         0 $self->{id} = "T" . $tid;
455              
456 0         0 $self->_adopt_channels_in ( @channels_in );
457 0         0 $self->_adopt_channels_out( @channels_out );
458              
459 0         0 $_->otherfd->close for @channels_in, @channels_out;
460             }
461              
462             # The injected program that goes into spawn mode
463 12     12   118 use constant PERL_RUNNER => <<'EOF';
  12         25  
  12         8021  
464             ( my ( $module, $func ), @INC ) = @ARGV;
465             ( my $file = "$module.pm" ) =~ s{::}{/}g;
466             require $file;
467             my $code = $module->can( $func ) or die "Module '$module' has no '$func'\n";
468             require IO::Async::Channel;
469             exit $code->( IO::Async::Channel->new_stdin, IO::Async::Channel->new_stdout );
470             EOF
471              
472             sub _setup_spawn
473             {
474 4     4   36 my $self = shift;
475              
476             $self->{code} and
477 4 50       58 die "Cannot run IO::Async::Routine in 'spawn' with code\n";
478              
479 4 50       24 @{ $self->{channels_in} } == 1 or
  4         46  
480             die "IO::Async::Routine in 'spawn' mode requires exactly one input channel\n";
481 4 50       38 @{ $self->{channels_out} } == 1 or
  4         50  
482             die "IO::Async::Routine in 'spawn' mode requires exactly one output channel\n";
483              
484 4         78 my @channels_in = $self->_create_channels_in;
485 4         42 my @channels_out = $self->_create_channels_out;
486              
487 4         28 my $module = $self->{module};
488 4         30 my $func = $self->{func};
489              
490             my $process = IO::Async::Process->new(
491             setup => [
492             stdin => $channels_in[0]->otherfd,
493             stdout => $channels_out[0]->otherfd,
494             ],
495 42         560 command => [ $^X, "-E", PERL_RUNNER, $module, $func, grep { !ref } @INC ],
496             on_finish => $self->_replace_weakself( sub {
497 0 0   0   0 my $self = shift or return;
498 0         0 my ( $exitcode ) = @_;
499 0         0 $self->maybe_invoke_event( on_finish => $exitcode );
500              
501 0 0       0 unless( $exitcode & 0x7f ) {
502 0         0 $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
503 0         0 $self->result_future->done( $exitcode >> 8 );
504             }
505             }),
506             on_exception => $self->_replace_weakself( sub {
507 0 0   0   0 my $self = shift or return;
508 0         0 my ( $exception, $errno, $exitcode ) = @_;
509              
510 0         0 $self->maybe_invoke_event( on_die => $exception );
511 0         0 $self->result_future->fail( $exception, routine => );
512 4         46 }),
513             );
514              
515 4         430 $self->_adopt_channels_in ( @channels_in );
516 4         54 $self->_adopt_channels_out( @channels_out );
517              
518 4         68 $self->add_child( $self->{process} = $process );
519 2         78 $self->{id} = "P" . $process->pid;
520              
521 2         79 $_->otherfd->close for @channels_in, @channels_out;
522             }
523              
524             =head1 METHODS
525              
526             =cut
527              
528             =head2 id
529              
530             $id = $routine->id
531              
532             Returns an ID string that uniquely identifies the Routine out of all the
533             currently-running ones. (The ID of already-exited Routines may be reused,
534             however.)
535              
536             =cut
537              
538             sub id
539             {
540 101     101 1 216 my $self = shift;
541 101         1809 return $self->{id};
542             }
543              
544             =head2 model
545              
546             $model = $routine->model
547              
548             Returns the detachment model in use by the Routine.
549              
550             =cut
551              
552             sub model
553             {
554 2     2 1 2760 my $self = shift;
555 2         56 return $self->{model};
556             }
557              
558             =head2 kill
559              
560             $routine->kill( $signal )
561              
562             Sends the specified signal to the routine code. This is either implemented by
563             C or C as required. Note that in the thread case
564             this has the usual limits of signal delivery to threads; namely, that it works
565             at the Perl interpreter level, and cannot actually interrupt blocking system
566             calls.
567              
568             =cut
569              
570             sub kill
571             {
572 2     2 1 3884 my $self = shift;
573 2         24 my ( $signal ) = @_;
574              
575 2 50       56 $self->{process}->kill( $signal ) if $self->{model} eq "fork";
576 2 50       20 threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread";
577             }
578              
579             =head2 result_future
580              
581             $f = $routine->result_future
582              
583             I
584              
585             Returns a new C which will complete with the eventual
586             return value or exception when the routine finishes.
587              
588             If the routine finishes with a successful result then this will be the C
589             result of the future. If the routine fails with an exception then this will be
590             the C result.
591              
592             =cut
593              
594             sub result_future
595             {
596 20     20 1 5902 my $self = shift;
597              
598 20   66     235 return $self->{result_future} //= do {
599 13         59 my $f = $self->loop->new_future;
600             # This future needs to strongly retain $self to ensure it definitely gets
601             # notified
602 13     13   212 $f->on_ready( sub { undef $self } );
  13         778  
603 13         459 $f;
604             };
605             }
606              
607             =head1 AUTHOR
608              
609             Paul Evans
610              
611             =cut
612              
613             0x55AA;