File Coverage

blib/lib/Process/Async/Manager.pm
Criterion Covered Total %
statement 34 43 79.0
branch 5 14 35.7
condition 4 12 33.3
subroutine 9 10 90.0
pod 4 4 100.0
total 56 83 67.4


line stmt bran cond sub pod time code
1             package Process::Async::Manager;
2             $Process::Async::Manager::VERSION = '0.003';
3 1     1   4 use strict;
  1         1  
  1         28  
4 1     1   3 use warnings;
  1         1  
  1         22  
5              
6 1     1   3 use parent qw(IO::Async::Notifier);
  1         1  
  1         8  
7              
8             =head1 NAME
9              
10             Process::Async::Manager - handle async background process
11              
12             =head1 VERSION
13              
14             version 0.003
15              
16             =head1 SYNOPSIS
17              
18             my $pm = Process::Async::Manager->new;
19             my $child = $pm->spawn(
20             worker_class => 'Some::Worker::Class',
21             );
22             $child->stdio->write('start');
23              
24             =head1 DESCRIPTION
25              
26             Look in examples/ in the source distribution.
27              
28             =cut
29              
30 1     1   48 use curry;
  1         1  
  1         17  
31 1     1   4 use Carp qw(confess);
  1         1  
  1         300  
32              
33             =head1 METHODS
34              
35             =cut
36              
37             =head2 configure
38              
39             Applies our configuration. Currently accepts:
40              
41             =over 4
42              
43             =item * worker - either the name of the subclass used for instantiating a worker,
44             or an existing instance, or a coderef which will return a suitable L
45             instance
46              
47             =item * child - either the name of the subclass used for instantiating a child,
48             or an existing instance, or a coderef which will return a suitable L
49             instance
50              
51             =back
52              
53             =cut
54              
55             sub configure {
56 1     1 1 4083 my ($self, %args) = @_;
57 1 50       10 $self->{worker} = delete $args{worker} if exists $args{worker};
58 1 50       4 $self->{child} = delete $args{child} if exists $args{child};
59 1         8 $self->SUPER::configure(%args);
60             }
61              
62             =head2 worker
63              
64             Accessor for the L generator/class/instance.
65              
66             =cut
67              
68 1     1 1 4 sub worker { shift->{worker} }
69              
70             =head2 child
71              
72             Accessor for the L generator/class/instance.
73              
74             =cut
75              
76 1     1 1 7 sub child { shift->{child} }
77              
78             =head2 spawn
79              
80             Spawn a child. Returns a L instance.
81              
82             Can take worker/child params.
83              
84             =cut
85              
86             sub spawn {
87 1     1 1 825 my ($self, %args) = @_;
88 1 50       11 die "Need to be added to an IO::Async::Loop or IO::Async::Notifier first" unless $self->loop;
89              
90             # Use the same loop subclass in the child process as we're using
91 1         9 my $loop_class = ref($self->loop);
92              
93 1         4 my $worker = delete $args{worker};
94 1   33     5 $worker ||= $self->worker;
95              
96 1         1 my $child = delete $args{child};
97 1   50     4 $child ||= $self->child || 'Process::Async::Child';
      33        
98 1 50 50     5 $child = $child->() if (ref $child // '') eq 'CODE';
99 1 50       14 $child = $child->new unless ref $child;
100              
101 1         92 $self->debug_printf("Starting %s worker via %s child with %s loop", ref($worker), ref($child), $loop_class);
102              
103             # Provide the code and a basic STDIO handler
104             $child->configure(
105             stdio => {
106             via => 'pipe_rdwr',
107             on_read => $child->curry::on_read,
108             },
109             code => sub {
110             # (from here, we're in the fork)
111 0     0   0 my $loop = $IO::Async::Loop::ONE_TRUE_LOOP = $loop_class->new;
112 0         0 $self->debug_printf("Loop %s initialised", $loop);
113 0 0 0     0 $worker = $worker->() if (ref $worker // '') eq 'CODE';
114 0 0       0 $worker = $worker->new unless ref $worker;
115              
116 0         0 $loop->add(
117             $worker
118             );
119 0         0 $self->debug_printf("Running worker %s", $worker);
120 0         0 my $exit = $worker->run($loop);
121 0         0 $self->debug_printf("Worker %s ->run completed with %d", $worker, $exit);
122 0         0 return $exit;
123             }
124 1         11 );
125 1         115 $self->add_child($child);
126 1         25642 $child
127             }
128              
129             1;
130              
131             __END__