File Coverage

blib/lib/IO/Async/Routine.pm
Criterion Covered Total %
statement 141 191 73.8
branch 31 70 44.2
condition 15 29 51.7
subroutine 26 31 83.8
pod 5 5 100.0
total 218 326 66.8


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2012-2021 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Routine;
7              
8 12     12   141982 use strict;
  12         24  
  12         380  
9 12     12   97 use warnings;
  12         21  
  12         556  
10              
11             our $VERSION = '0.801';
12              
13 12     12   75 use base qw( IO::Async::Notifier );
  12         21  
  12         2103  
14              
15 12     12   91 use Carp;
  12         24  
  12         745  
16              
17 12     12   2433 use IO::Async::OS;
  12         25  
  12         366  
18 12     12   6968 use IO::Async::Process;
  12         32  
  12         397  
19              
20 12     12   992 use Struct::Dumb qw( readonly_struct );
  12         3527  
  12         65  
21              
22             =head1 NAME
23              
24             C - execute code in an independent sub-process or thread
25              
26             =head1 SYNOPSIS
27              
28             use IO::Async::Routine;
29             use IO::Async::Channel;
30              
31             use IO::Async::Loop;
32             my $loop = IO::Async::Loop->new;
33              
34             my $nums_ch = IO::Async::Channel->new;
35             my $ret_ch = IO::Async::Channel->new;
36              
37             my $routine = IO::Async::Routine->new(
38             channels_in => [ $nums_ch ],
39             channels_out => [ $ret_ch ],
40              
41             code => sub {
42             my @nums = @{ $nums_ch->recv };
43             my $ret = 0; $ret += $_ for @nums;
44              
45             # Can only send references
46             $ret_ch->send( \$ret );
47             },
48              
49             on_finish => sub {
50             say "The routine aborted early - $_[-1]";
51             $loop->stop;
52             },
53             );
54              
55             $loop->add( $routine );
56              
57             $nums_ch->send( [ 10, 20, 30 ] );
58             $ret_ch->recv(
59             on_recv => sub {
60             my ( $ch, $totalref ) = @_;
61             say "The total of 10, 20, 30 is: $$totalref";
62             $loop->stop;
63             }
64             );
65              
66             $loop->run;
67              
68             =head1 DESCRIPTION
69              
70             This L contains a body of code and executes it in a
71             sub-process or thread, allowing it to act independently of the main program.
72             Once set up, all communication with the code happens by values passed into or
73             out of the Routine via L objects.
74              
75             The code contained within the Routine is free to make blocking calls without
76             stalling the rest of the program. This makes it useful for using existing code
77             which has no option not to block within an L-based program.
78              
79             To create asynchronous wrappers of functions that return a value based only on
80             their arguments, and do not generally maintain state within the process it may
81             be more convenient to use an L instead, which uses an
82             C to contain the body of the function and manages the
83             Channels itself.
84              
85             =head2 Models
86              
87             A choice of detachment model is available. Each has various advantages and
88             disadvantages. Not all of them may be available on a particular system.
89              
90             =head3 The C model
91              
92             The code in this model runs within its own process, created by calling
93             C from the main process. It is isolated from the rest of the program
94             in terms of memory, CPU time, and other resources. Because it is started
95             using C, the initial process state is a clone of the main process.
96              
97             This model performs well on UNIX-like operating systems which possess a true
98             native C system call, but is not available on C for example,
99             because the operating system does not provide full fork-like semantics.
100              
101             =head3 The C model
102              
103             The code in this model runs inside a separate thread within the main process.
104             It therefore shares memory and other resources such as open filehandles with
105             the main thread. As with the C model, the initial thread state is cloned
106             from the main controlling thread.
107              
108             This model is only available on perls built to support threading.
109              
110             =head3 The C model
111              
112             I
113              
114             The code in this model runs within its own freshly-created process running
115             another copy of the perl interpreter. Similar to the C model it
116             therefore has its own memory, CPU time, and other resources. However, since it
117             is started freshly rather than by cloning the main process, it starts up in a
118             clean state, without any shared resources from its parent.
119              
120             Since this model creates a new fresh process rather than sharing existing
121             state, it cannot use the C argument to specify the routine body; it must
122             instead use only the C and C arguments.
123              
124             In the current implementation this model requires exactly one input channel
125             and exactly one output channel; both must be present, and there cannot be more
126             than one of either.
127              
128             =cut
129              
130             =head1 EVENTS
131              
132             =head2 on_finish $exitcode
133              
134             For C-based Routines, this is invoked after the process has exited and
135             is passed the raw exitcode status.
136              
137             =head2 on_finish $type, @result
138              
139             For thread-based Routines, this is invoked after the thread has returned from
140             its code block and is passed the C result.
141              
142             As the behaviour of these events differs per model, it may be more convenient
143             to use C and C instead.
144              
145             =head2 on_return $result
146              
147             Invoked if the code block returns normally. Note that C-based Routines
148             can only transport an integer result between 0 and 255, as this is the actual
149             C value.
150              
151             =head2 on_die $exception
152              
153             Invoked if the code block fails with an exception.
154              
155             =cut
156              
157             =head1 PARAMETERS
158              
159             The following named parameters may be passed to C or C:
160              
161             =head2 model => "fork" | "thread" | "spawn"
162              
163             Optional. Defines how the routine will detach itself from the main process.
164             See the L section above for more detail.
165              
166             If the model is not specified, the environment variable
167             C is used to pick a default. If that isn't defined,
168             C is preferred if it is available, otherwise C.
169              
170             =head2 channels_in => ARRAY of IO::Async::Channel
171              
172             ARRAY reference of L objects to set up for passing values
173             in to the Routine.
174              
175             =head2 channels_out => ARRAY of IO::Async::Channel
176              
177             ARRAY reference of L objects to set up for passing values
178             out of the Routine.
179              
180             =head2 code => CODE
181              
182             CODE reference to the body of the Routine, to execute once the channels are
183             set up.
184              
185             When using the C model, this is not permitted; you must use C
186             and C instead.
187              
188             =head2 module => STRING
189              
190             =head2 func => STRING
191              
192             I
193              
194             An alternative to the C argument, which names a module to load and a
195             function to call within it. C should give a perl module name (i.e.
196             C, not a filename like F), and C should give
197             the basename of a function within that module (i.e. without the module name
198             prefixed). It will be invoked as the main code body of the object, and passed
199             in a list of all the channels; first the input ones then the output ones.
200              
201             module::func( @channels_in, @channels_out )
202              
203             =head2 setup => ARRAY
204              
205             Optional. For C-based Routines, gives a reference to an array to pass
206             to the underlying C C method. Ignored for thread-based
207             Routines.
208              
209             =cut
210              
211 12 0       21564 use constant PREFERRED_MODEL =>
    50          
212             IO::Async::OS->HAVE_POSIX_FORK ? "fork" :
213             IO::Async::OS->HAVE_THREADS ? "thread" :
214 12     12   1260 die "No viable Routine models";
  12         25  
215              
216             sub _init
217             {
218 74     74   141 my $self = shift;
219 74         139 my ( $params ) = @_;
220              
221 74   50     978 $params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL;
      66        
222              
223 74         316 $self->SUPER::_init( @_ );
224             }
225              
226             my %SETUP_CODE;
227              
228             sub configure
229             {
230 74     74 1 143 my $self = shift;
231 74         323 my %params = @_;
232              
233             # TODO: Can only reconfigure when not running
234 74         246 foreach (qw( channels_in channels_out code module func setup on_finish on_return on_die )) {
235 666 100       1636 $self->{$_} = delete $params{$_} if exists $params{$_};
236             }
237              
238             defined $self->{code} and defined $self->{func} and
239 74 50 66     743 croak "Cannot ->configure both 'code' and 'func'";
240             defined $self->{func} and !defined $self->{module} and
241 74 50 66     398 croak "'func' parameter requires a 'module' as well";
242              
243 74 50       249 if( defined( my $model = delete $params{model} ) ) {
244 74 50 66     560 ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
245             or die "Unrecognised Routine model $model";
246              
247             # TODO: optional plugin "configure" check here?
248 74 50 66     1185 $model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and
249             croak "Cannot use 'fork' model as fork() is not available";
250 74 50 33     311 $model eq "thread" and !IO::Async::OS->HAVE_THREADS and
251             croak "Cannot use 'thread' model as threads are not available";
252              
253 74         209 $self->{model} = $model;
254             }
255              
256 74         313 $self->SUPER::configure( %params );
257             }
258              
259             sub _add_to_loop
260             {
261 74     74   180 my $self = shift;
262 74         178 my ( $loop ) = @_;
263 74         257 $self->SUPER::_add_to_loop( $loop );
264              
265 74         153 my $model = $self->{model};
266              
267 74 50 33     307 my $code = ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
268             or die "Unrecognised Routine model $model";
269              
270 74         283 $self->$code();
271             }
272              
273             readonly_struct ChannelSetup => [qw( chan myfd otherfd )];
274              
275             sub _create_channels_in
276             {
277 74     74   212 my $self = shift;
278              
279 74         140 my @channels_in;
280              
281 74 100       152 foreach my $ch ( @{ $self->{channels_in} || [] } ) {
  74         402  
282 66         219 my ( $rd, $wr );
283 66 100       506 unless( $rd = $ch->_extract_read_handle ) {
284 65         818 ( $rd, $wr ) = IO::Async::OS->pipepair;
285             }
286 66         337 push @channels_in, ChannelSetup( $ch, $wr, $rd );
287             }
288              
289 74         996 return @channels_in;
290             }
291              
292             sub _create_channels_out
293             {
294 74     74   270 my $self = shift;
295              
296 74         114 my @channels_out;
297              
298 74 100       153 foreach my $ch ( @{ $self->{channels_out} || [] } ) {
  74         618  
299 67         193 my ( $rd, $wr );
300 67 50       278 unless( $wr = $ch->_extract_write_handle ) {
301 67         396 ( $rd, $wr ) = IO::Async::OS->pipepair;
302             }
303 67         315 push @channels_out, ChannelSetup( $ch, $rd, $wr );
304             }
305              
306 74         748 return @channels_out;
307             }
308              
309             sub _adopt_channels_in
310             {
311 74     74   136 my $self = shift;
312 74         196 my ( @channels_in ) = @_;
313              
314 74         183 foreach ( @channels_in ) {
315 66         225 my $ch = $_->chan;
316 66         484 $ch->setup_async_mode( write_handle => $_->myfd );
317 66 100       245 $self->add_child( $ch ) unless $ch->parent;
318             }
319             }
320              
321             sub _adopt_channels_out
322             {
323 74     74   115 my $self = shift;
324 74         160 my ( @channels_out ) = @_;
325              
326 74         170 foreach ( @channels_out ) {
327 67         211 my $ch = $_->chan;
328 67         404 $ch->setup_async_mode( read_handle => $_->myfd );
329 67 50       236 $self->add_child( $ch ) unless $ch->parent;
330             }
331             }
332              
333             sub _setup_fork
334             {
335 70     70   129 my $self = shift;
336              
337 70         454 my @channels_in = $self->_create_channels_in;
338 70         594 my @channels_out = $self->_create_channels_out;
339              
340 70         162 my $code = $self->{code};
341              
342 70         136 my $module = $self->{module};
343 70         111 my $func = $self->{func};
344              
345 70         161 my @setup = map { $_->otherfd => "keep" } @channels_in, @channels_out;
  125         658  
346              
347 70         487 my $setup = $self->{setup};
348 70 100       203 push @setup, @$setup if $setup;
349              
350             my $process = IO::Async::Process->new(
351             setup => \@setup,
352             code => sub {
353 1     1   14 foreach ( @channels_in, @channels_out ) {
354 2         211 $_->chan->setup_sync_mode( $_->otherfd );
355             }
356              
357 1 50       58 if( defined $module ) {
358 0         0 ( my $file = "$module.pm" ) =~ s{::}{/}g;
359 0         0 require $file;
360              
361 0 0       0 $code = $module->can( $func ) or
362             die "Module '$module' has no '$func'\n";
363             }
364              
365 1         6 my $ret = $code->( map { $_->chan } @channels_in, @channels_out );
  2         12  
366              
367 0         0 foreach ( @channels_in, @channels_out ) {
368 0         0 $_->chan->close;
369             }
370              
371 0         0 return $ret;
372             },
373             on_finish => $self->_replace_weakself( sub {
374 9 50   9   34 my $self = shift or return;
375 9         18 my ( $exitcode ) = @_;
376 9         72 $self->maybe_invoke_event( on_finish => $exitcode );
377              
378 9 50       46 unless( $exitcode & 0x7f ) {
379 9         36 $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
380 9         68 $self->result_future->done( $exitcode >> 8 );
381             }
382             }),
383             on_exception => $self->_replace_weakself( sub {
384 4 50   4   34 my $self = shift or return;
385 4         24 my ( $exception, $errno, $exitcode ) = @_;
386              
387 4         64 $self->maybe_invoke_event( on_die => $exception );
388 4         62 $self->result_future->fail( $exception, routine => );
389 70         1138 }),
390             );
391              
392 70         299 $self->_adopt_channels_in ( @channels_in );
393 70         244 $self->_adopt_channels_out( @channels_out );
394              
395 70         445 $self->add_child( $self->{process} = $process );
396 69         1288 $self->{id} = "P" . $process->pid;
397              
398 69         1492 $_->otherfd->close for @channels_in, @channels_out;
399             }
400              
401             sub _setup_thread
402             {
403 0     0   0 my $self = shift;
404              
405 0         0 my @channels_in = $self->_create_channels_in;
406 0         0 my @channels_out = $self->_create_channels_out;
407              
408 0         0 my $code = $self->{code};
409              
410 0         0 my $module = $self->{module};
411 0         0 my $func = $self->{func};
412              
413             my $tid = $self->loop->create_thread(
414             code => sub {
415 0     0   0 foreach ( @channels_in, @channels_out ) {
416 0         0 $_->chan->setup_sync_mode( $_->otherfd );
417 0   0     0 defined and $_->close for $_->myfd;
418             }
419              
420 0 0       0 if( defined $func ) {
421 0         0 ( my $file = "$module.pm" ) =~ s{::}{/}g;
422 0         0 require $file;
423              
424 0 0       0 $code = $module->can( $func ) or
425             die "Module '$module' has no '$func'\n";
426             }
427              
428 0         0 my $ret = $code->( map { $_->chan } @channels_in, @channels_out );
  0         0  
429              
430 0         0 foreach ( @channels_in, @channels_out ) {
431 0         0 $_->chan->close;
432             }
433              
434 0         0 return $ret;
435             },
436             on_joined => $self->_capture_weakself( sub {
437 0 0   0   0 my $self = shift or return;
438 0         0 my ( $ev, @result ) = @_;
439 0         0 $self->maybe_invoke_event( on_finish => @_ );
440              
441 0 0       0 if( $ev eq "return" ) {
442 0         0 $self->maybe_invoke_event( on_return => @result );
443 0         0 $self->result_future->done( @result );
444             }
445 0 0       0 if( $ev eq "died" ) {
446 0         0 $self->maybe_invoke_event( on_die => $result[0] );
447 0         0 $self->result_future->fail( $result[0], routine => );
448             }
449              
450 0         0 delete $self->{tid};
451 0         0 }),
452             );
453              
454 0         0 $self->{tid} = $tid;
455 0         0 $self->{id} = "T" . $tid;
456              
457 0         0 $self->_adopt_channels_in ( @channels_in );
458 0         0 $self->_adopt_channels_out( @channels_out );
459              
460 0         0 $_->otherfd->close for @channels_in, @channels_out;
461             }
462              
463             # The injected program that goes into spawn mode
464 12     12   181 use constant PERL_RUNNER => <<'EOF';
  12         34  
  12         7951  
465             ( my ( $module, $func ), @INC ) = @ARGV;
466             ( my $file = "$module.pm" ) =~ s{::}{/}g;
467             require $file;
468             my $code = $module->can( $func ) or die "Module '$module' has no '$func'\n";
469             require IO::Async::Channel;
470             exit $code->( IO::Async::Channel->new_stdin, IO::Async::Channel->new_stdout );
471             EOF
472              
473             sub _setup_spawn
474             {
475 4     4   28 my $self = shift;
476              
477             $self->{code} and
478 4 50       60 die "Cannot run IO::Async::Routine in 'spawn' with code\n";
479              
480 4 50       30 @{ $self->{channels_in} } == 1 or
  4         46  
481             die "IO::Async::Routine in 'spawn' mode requires exactly one input channel\n";
482 4 50       22 @{ $self->{channels_out} } == 1 or
  4         52  
483             die "IO::Async::Routine in 'spawn' mode requires exactly one output channel\n";
484              
485 4         62 my @channels_in = $self->_create_channels_in;
486 4         96 my @channels_out = $self->_create_channels_out;
487              
488 4         12 my $module = $self->{module};
489 4         22 my $func = $self->{func};
490              
491             my $process = IO::Async::Process->new(
492             setup => [
493             stdin => $channels_in[0]->otherfd,
494             stdout => $channels_out[0]->otherfd,
495             ],
496 42         640 command => [ $^X, "-E", PERL_RUNNER, $module, $func, grep { !ref } @INC ],
497             on_finish => $self->_replace_weakself( sub {
498 0 0   0   0 my $self = shift or return;
499 0         0 my ( $exitcode ) = @_;
500 0         0 $self->maybe_invoke_event( on_finish => $exitcode );
501              
502 0 0       0 unless( $exitcode & 0x7f ) {
503 0         0 $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
504 0         0 $self->result_future->done( $exitcode >> 8 );
505             }
506             }),
507             on_exception => $self->_replace_weakself( sub {
508 0 0   0   0 my $self = shift or return;
509 0         0 my ( $exception, $errno, $exitcode ) = @_;
510              
511 0         0 $self->maybe_invoke_event( on_die => $exception );
512 0         0 $self->result_future->fail( $exception, routine => );
513 4         56 }),
514             );
515              
516 4         42 $self->_adopt_channels_in ( @channels_in );
517 4         26 $self->_adopt_channels_out( @channels_out );
518              
519 4         66 $self->add_child( $self->{process} = $process );
520 2         248 $self->{id} = "P" . $process->pid;
521              
522 2         80 $_->otherfd->close for @channels_in, @channels_out;
523             }
524              
525             =head1 METHODS
526              
527             =cut
528              
529             =head2 id
530              
531             $id = $routine->id
532              
533             Returns an ID string that uniquely identifies the Routine out of all the
534             currently-running ones. (The ID of already-exited Routines may be reused,
535             however.)
536              
537             =cut
538              
539             sub id
540             {
541 101     101 1 222 my $self = shift;
542 101         1897 return $self->{id};
543             }
544              
545             =head2 model
546              
547             $model = $routine->model
548              
549             Returns the detachment model in use by the Routine.
550              
551             =cut
552              
553             sub model
554             {
555 2     2 1 2792 my $self = shift;
556 2         46 return $self->{model};
557             }
558              
559             =head2 kill
560              
561             $routine->kill( $signal )
562              
563             Sends the specified signal to the routine code. This is either implemented by
564             C or C as required. Note that in the thread case
565             this has the usual limits of signal delivery to threads; namely, that it works
566             at the Perl interpreter level, and cannot actually interrupt blocking system
567             calls.
568              
569             =cut
570              
571             sub kill
572             {
573 2     2 1 4246 my $self = shift;
574 2         26 my ( $signal ) = @_;
575              
576 2 50       64 $self->{process}->kill( $signal ) if $self->{model} eq "fork";
577 2 50       50 threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread";
578             }
579              
580             =head2 result_future
581              
582             $f = $routine->result_future
583              
584             I
585              
586             Returns a new C which will complete with the eventual
587             return value or exception when the routine finishes.
588              
589             If the routine finishes with a successful result then this will be the C
590             result of the future. If the routine fails with an exception then this will be
591             the C result.
592              
593             =cut
594              
595             sub result_future
596             {
597 20     20 1 5073 my $self = shift;
598              
599 20   66     211 return $self->{result_future} //= do {
600 13         67 my $f = $self->loop->new_future;
601             # This future needs to strongly retain $self to ensure it definitely gets
602             # notified
603 13     13   167 $f->on_ready( sub { undef $self } );
  13         776  
604 13         376 $f;
605             };
606             }
607              
608             =head1 AUTHOR
609              
610             Paul Evans
611              
612             =cut
613              
614             0x55AA;