File Coverage

blib/lib/Coro/ProcessPool/Process.pm
Criterion Covered Total %
statement 80 89 89.8
branch 10 18 55.5
condition 3 4 75.0
subroutine 20 22 90.9
pod 0 8 0.0
total 113 141 80.1


line stmt bran cond sub pod time code
1             package Coro::ProcessPool::Process;
2             # ABSTRACT: Manager for worker subprocess
3             $Coro::ProcessPool::Process::VERSION = '0.28';
4 3     3   154174 use strict;
  3         13  
  3         80  
5 3     3   16 use warnings;
  3         5  
  3         67  
6 3     3   14 use Carp;
  3         6  
  3         133  
7 3     3   15 use Coro;
  3         7  
  3         150  
8 3     3   286 use Coro::Countdown;
  3         250  
  3         72  
9 3     3   1301 use Data::UUID;
  3         1921  
  3         165  
10 3     3   19 use POSIX qw(:sys_wait_h);
  3         6  
  3         24  
11 3     3   10000 use Coro::Handle qw(unblock);
  3         129408  
  3         540  
12 3     3   27 use AnyEvent::Util qw(run_cmd portable_pipe);
  3         8  
  3         131  
13 3     3   1682 use Coro::ProcessPool::Util qw(get_command_path get_args encode decode $EOL);
  3         10  
  3         513  
14              
15 3     3   24 use parent 'Exporter';
  3         4  
  3         15  
16             our @EXPORT_OK = qw(worker);
17              
18             my $UUID = Data::UUID->new;
19              
20             sub worker {
21 23     23 0 19751   my %param = @_;
22 23   100     158   my $inc = $param{include} // []; # include directories
23 23         129   my $cmd = get_command_path; # perl path
24 23         72   my $args = get_args(@$inc); # duplicate current -I's along with
25             # explicit includes; also adds command
26             # to start worker
27 23         71   my $exec = "$cmd $args"; # final worker command
28              
29             # Create a pipe for each direction
30 23         136   my ($child_in, $parent_out) = portable_pipe;
31 23         691   my ($parent_in, $child_out) = portable_pipe;
32              
33             # Build instance
34 23         649   my $proc = bless {
35                 pid => undef, # will be set by run_cmd
36                 in => unblock($parent_in), # from child (results)
37                 out => unblock($parent_out), # to child (tasks)
38                 inbox => {}, # uuid -> condvar to signal completion of task
39                 reader => undef, # coro watching child output ($parent_in)
40                 stopped => undef, # condvar to signal when process is terminated
41                 started => AE::cv, # signaled when process has self-identified as ready
42                 counter => 0, # total tasks accepted
43                 pending => Coro::Countdown->new, # pending task counter
44               }, 'Coro::ProcessPool::Process';
45              
46             # Launch worker process without blocking
47               $proc->{stopped} = run_cmd $exec, (
48                 'close_all' => 1,
49                 '$$' => \$proc->{pid},
50                 '>' => $child_out,
51                 '<' => $child_in,
52                 '2>' => sub {
53             # Reemit errors/warnings to the parent's stderr
54 5 50   5   15244       my $err = shift or return; # called once with undef when worker exits
55 0         0       warn "[worker pid:$proc->{pid}] $err\n";
56                 },
57 23         18475   );
58              
59             # Add callback to clean up pipe handles after process exits
60               $proc->{stopped}->cb(sub {
61 5     5   704     $proc->{in}->close;
62 5         179     $proc->{out}->close;
63 23         31879   });
64              
65             # Add watcher for worker output
66               $proc->{reader} = async {
67 23     23   1676     my $proc = shift;
68              
69             # Worker notifies us that it is initialized by sending its pid, followed by
70             # an $EOL. In turn, we wake up any watchers waiting for the process to be
71             # ready.
72 23         45     do {
73 23         194       my $pid = $proc->{in}->readline($EOL);
74 5         767946       chomp $pid;
75 5         81       $proc->{started}->send($pid);
76                 };
77              
78             # Read loop
79 5         185     while (my $line = $proc->{in}->readline($EOL)) {
80 15         4284       my ($id, $error, $data) = decode($line);
81              
82             # Signal watchers with result
83 15 50       41       if (exists $proc->{inbox}{$id}) {
84 15 50       30         if ($error) {
85 0         0           $proc->{inbox}{$id}->croak($data);
86                     } else {
87 15         45           $proc->{inbox}{$id}->send($data);
88                     }
89              
90             # Clean up tracking and decrement pending counter
91 15         203         delete $proc->{inbox}{$id};
92 15         45         $proc->{pending}->down;
93              
94                   } else {
95 0         0         warn "Unexpected message received: $id";
96                   }
97                 }
98 23         837   } $proc;
99              
100 23         1563   return $proc;
101             }
102              
103             sub pid {
104 0     0 0 0   my $proc = shift;
105 0         0   return $proc->{pid};
106             }
107              
108             sub await {
109 5     5 0 12   my $proc = shift;
110 5         32   $proc->{started}->recv;
111             }
112              
113             sub join {
114 5     5 0 147   my $proc = shift;
115 5         27   $proc->{pending}->join; # wait on all pending tasks to complete
116 5         174   $proc->{stopped}->recv; # watch for process termination signal
117             }
118              
119             sub alive {
120 27     27 0 220   my $proc = shift;
121 27 100       144   return 0 unless $proc->{started}->ready; # has process signaled its own readiness?
122 26 100       175   return 0 if $proc->{stopped}->ready; # has process already been stopped?
123 21 50       250   return 1 if waitpid($proc->{pid}, WNOHANG) >= 0; # does the process look alive?
124 0         0   return 0;
125             }
126              
127             sub stop {
128 5     5 0 4105   my $proc = shift;
129 5 50       19   if ($proc->alive) {
130             # Send command to tell worker to self-terminate
131 5         36     $proc->{out}->print(encode('', 'self-terminate', []) . $EOL);
132               }
133             }
134              
135             sub kill {
136 0     0 0 0   my $proc = shift;
137 0 0       0   if ($proc->alive) {
138             # Force the issue
139 0         0     kill('KILL', $proc->{pid});
140               }
141             }
142              
143             sub send {
144 15     15 0 2984   my ($proc, $f, $args) = @_;
145 15 50       53   croak 'subprocess is not running' unless $proc->alive;
146              
147             # Add a watcher to the inbox for this task
148 15         336   my $id = $UUID->create_str;
149 15         499   $proc->{inbox}{$id} = AE::cv;
150              
151             # Send the task to the worker
152 15   50     239   $proc->{out}->print(encode($id, $f, $args || []) . $EOL);
153              
154 15         692   ++$proc->{counter}; # increment count of total tasks accepted
155 15         118   $proc->{pending}->up; # increment counter of pending tasks
156              
157             # Return condvar that will be signaled by the input watcher when the results
158             # for this $id are ready.
159 15         155   return $proc->{inbox}{$id};
160             }
161              
162             1;
163              
164             __END__
165            
166             =pod
167            
168             =encoding UTF-8
169            
170             =head1 NAME
171            
172             Coro::ProcessPool::Process - Manager for worker subprocess
173            
174             =head1 VERSION
175            
176             version 0.28
177            
178             =head1 AUTHOR
179            
180             Jeff Ober <sysread@fastmail.fm>
181            
182             =head1 COPYRIGHT AND LICENSE
183            
184             This software is copyright (c) 2017 by Jeff Ober.
185            
186             This is free software; you can redistribute it and/or modify it under
187             the same terms as the Perl 5 programming language system itself.
188            
189             =cut
190