File Coverage

blib/lib/Coro/ProcessPool.pm
Criterion Covered Total %
statement 33 79 41.7
branch 0 4 0.0
condition 2 6 33.3
subroutine 11 19 57.8
pod 6 7 85.7
total 52 115 45.2


line stmt bran cond sub pod time code
1             package Coro::ProcessPool;
2             # ABSTRACT: An asynchronous pool of perl processes
3             $Coro::ProcessPool::VERSION = '0.28';
4 2     2   322233 use strict;
  2         24  
  2         58  
5 2     2   10 use warnings;
  2         4  
  2         50  
6 2     2   11 use Coro;
  2         4  
  2         119  
7 2     2   614 use AnyEvent;
  2         3949  
  2         58  
8 2     2   1121 use Coro::Countdown;
  2         1705  
  2         63  
9 2     2   1164 use Coro::ProcessPool::Process qw(worker);
  2         6  
  2         131  
10 2     2   18 use Coro::ProcessPool::Util qw($CPUS);
  2         5  
  2         1623  
11              
12             sub new{
13 2     2 0 5508   my ($class, %param) = @_;
14              
15               my $self = bless{
16                 max_procs => $param{max_procs} || $CPUS,
17                 max_reqs => $param{max_reqs},
18                 include => $param{include},
19 2   66     27     queue => Coro::Channel->new,
20                 pool => Coro::Channel->new,
21               }, $class;
22              
23             # Initialize worker processes
24 2         3293   for (1 .. $self->{max_procs}) {
25 18         412     $self->{pool}->put($self->_proc);
26               }
27              
28             # Start pool worker
29 2         117   $self->{worker} = async(\&_worker, $self);
30              
31 2         72   return $self;
32             }
33              
34             sub _proc{
35 18     18   67   my $self = shift;
36 18         65   worker(include => $self->{include});
37             }
38              
39             sub _worker{
40 0     0   0   my $self = shift;
41              
42             # Read tasks from the queue
43 0         0   while (my $task = $self->{queue}->get) {
44 0         0     my ($caller, $f, @args) = @$task;
45              
46             # Wait for a worker process to become available
47                 WORKER:
48 0         0     my $ps = $self->{pool}->get;
49              
50             # If the worker has serviced at least as many requests as our
51             # limit, stop the worker process, put a new one into the pool,
52             # and REDO FROM START.
53 0 0 0     0     if ($self->{max_reqs} && $ps->{counter} >= $self->{max_reqs}) {
54             # No need to hold up the next task to deal with this
55                   async_pool{
56 0     0   0         my $ps = shift;
57 0         0         $ps->stop;
58 0         0         $ps->join;
59 0         0       } $ps;
60              
61 0         0       $self->{pool}->put($self->_proc);
62 0         0       goto WORKER;
63                 }
64              
65             # Ensure our worker is completely initialized
66 0         0     $ps->await;
67              
68             # Send task to our worker process
69 0         0     my $cv = $ps->send($f, \@args);
70              
71             # Schedule a thread to watch for the result and deliver it to the caller
72                 async_pool{
73 0     0   0       my ($k, $cv) = @_;
74 0         0       my $ret = eval{ $cv->recv };
  0         0  
75 0 0       0       $@ ? $k->croak($@) : $k->send($ret);
76 0         0     } $caller, $cv;
77              
78             # Return the worker to the pool
79 0         0     $self->{pool}->put($ps);
80               }
81              
82             # The queue is shut down and no tasks remain. Notify workers to shut down.
83 0         0   my @procs;
84 0         0   $self->{pool}->shutdown;
85 0         0   while (my $ps = $self->{pool}->get) {
86 0         0     push @procs, $ps;
87               }
88              
89             # Wait for all workers to self-terminate
90 0         0   $_->await foreach @procs; # some workers might have just been started
91 0         0   $_->stop foreach @procs; # stop the workers
92 0         0   $_->join foreach @procs; # wait on the process to completely terminate
93             }
94              
95             sub shutdown{
96 1     1 1 305   my $self = shift;
97 1         12   $self->{queue}->shutdown;
98             }
99              
100             sub join{
101 1     1 1 22   my $self = shift;
102 1         108   $self->{worker}->join;
103             }
104              
105             sub defer{
106 0     0 1     my $self = shift;
107 0             my $cv = AE::cv;
108 0             $self->{queue}->put([$cv, @_]);
109 0             return $cv;
110             }
111              
112             sub process{
113 0     0 1     my $self = shift;
114 0             $self->defer(@_)->recv;
115             }
116              
117             sub map {
118 0     0 1     my ($self, $f, @args) = @_;
119              
120             # Inverse semaphore to track pending requests
121 0             my $rem = new Coro::Countdown;
122              
123             # Queue each argument and store as an ordered list to preserve original
124             # ordering of the argments
125               my @cvs = map {
126 0               $rem->up;
  0            
127 0               $self->defer($f, $_);
128               } @args;
129              
130             # Collect results, retaining original ordering by respecting the orignial
131             # list index
132 0             my @res;
133 0             foreach my $i (0 .. $#args) {
134                 async_pool {
135 0     0           $res[$i] = $_[0]->recv;
136 0                 $rem->down;
137 0               } $cvs[$i];
138               }
139              
140             # Wait for all requests to complete and return the result
141 0             $rem->join;
142 0             return @res;
143             }
144              
145             sub pipeline {
146 0     0 1     my $self = shift;
147 0             return Coro::ProcessPool::Pipeline->new(pool => $self, @_);
148             }
149              
150              
151             1;
152              
153             __END__
154            
155             =pod
156            
157             =encoding UTF-8
158            
159             =head1 NAME
160            
161             Coro::ProcessPool - An asynchronous pool of perl processes
162            
163             =head1 VERSION
164            
165             version 0.28
166            
167             =head1 SYNOPSIS
168            
169             use Coro::ProcessPool;
170             use Coro;
171            
172             my $pool = Coro::ProcessPool->new(
173             max_procs => 4,
174             max_reqs => 100,
175             include => ['/path/to/my/task/classes', '/path/to/other/packages'],
176             );
177            
178             my $double = sub { $_[0] * 2 };
179            
180             #-----------------------------------------------------------------------
181             # Process in sequence, waiting for each result in turn
182             #-----------------------------------------------------------------------
183             my %result;
184             foreach my $i (1 .. 1000) {
185             $result{$i} = $pool->process($double, $i);
186             }
187            
188             #-----------------------------------------------------------------------
189             # Process as a batch
190             #-----------------------------------------------------------------------
191             my @results = $pool->map($double, 1 .. 1000);
192            
193             #-----------------------------------------------------------------------
194             # Defer waiting for result
195             #-----------------------------------------------------------------------
196             my %deferred;
197            
198             $deferred{$_} = $pool->defer($double, $_)
199             foreach 1 .. 1000;
200            
201             # Later
202             foreach my $i (keys %deferred) {
203             print "$i = " . $deferred{$i}->() . "\n";
204             }
205            
206             #-----------------------------------------------------------------------
207             # Use a "task class" implementing 'new' and 'run'
208             #-----------------------------------------------------------------------
209             my $result = $pool->process('Task::Doubler', 21);
210            
211             #-----------------------------------------------------------------------
212             # Pipelines (work queues)
213             #-----------------------------------------------------------------------
214             my $pipe = $pool->pipeline;
215            
216             # Start producer thread to queue tasks
217             my $producer = async {
218             while (my $task = get_next_task()) {
219             $pipe->queue('Some::TaskClass', $task);
220             }
221            
222             # Let the pipeline know no more tasks are coming
223             $pipe->shutdown;
224             };
225            
226             # Collect the results of each task as they are received
227             while (my $result = $pipe->next) {
228             do_stuff_with($result);
229             }
230            
231             $pool->shutdown;
232            
233             =head1 DESCRIPTION
234            
235             Processes tasks using a pool of external Perl processes.
236            
237             =head1 CONSTRUCTOR
238            
239             my $pool = Coro::ProcessPool->new(
240             max_procs => 4,
241             max_reqs => 100,
242             include => ['path/to/my/packages', 'some/more/packages'],
243             );
244            
245             =head2 max_procs
246            
247             The maximum number of processes to run within the process pool. Defaults
248             to the number of CPUs on the ssytem.
249            
250             =head2 max_reqs
251            
252             The maximum number of tasks a worker process may run before being terminated
253             and replaced with a fresh process. This is useful for tasks that might leak
254             memory over time.
255            
256             =head2 include
257            
258             An optional array ref of directory paths to prepend to the set of directories
259             the worker process will use to find Perl packages.
260            
261             =head1 METHODS
262            
263             =head2 shutdown
264            
265             Tells the pool to terminate after all pending tasks have been completed. Note
266             that this does not prevent new tasks from being queued or even processed. Once
267             called, use L</join> to safely wait until the final task has completed and the
268             pool is no longer running.
269            
270             =head2 join
271            
272             Cedes control to the event loop until the pool is shutdown and has completed
273             all tasks. If called I<before> L</shutdown>, take care to ensure that another
274             thread is responsible for shutting down the pool.
275            
276             =head2 defer
277            
278             Queues a task to be processed by the pool. Tasks may specified in either of two
279             forms, as a code ref or the fully qualified name of a perl class which
280             implements two methods, C<new> and C<run>. Any remaining arguments to C<defer>
281             are passed unchanged to the code ref or the C<new> method of the task class.
282            
283             C<defer> will immediately return an L<AnyEvent/condvar> that will wait for and
284             return the result of the task (or croak if the task generated an error).
285            
286             # Using a code ref
287             my $cv = $pool->defer(\&func, $arg1, $arg2, $arg3);
288             my $result = $cv->recv;
289            
290             # With a task class
291             my $cv = $pool->defer('Some::Task::Class', $arg1, $arg2, $arg3);
292             my $result = $cv->recv;
293            
294             =head2 process
295            
296             Calls defer and immediately calls C<recv> on the returned condvar, returning
297             the result. This is useful if your workflow includes multiple threads which
298             share the same pool. All arguments are passed unchanged to C<defer>.
299            
300             =head2 map
301            
302             Like perl's C<map>, applies a code ref to a list of arguments. This method will
303             cede until all results have been returned by the pool, returning the result as
304             a list. The order of arguments and results is preserved as expected.
305            
306             my @results = $pool->map(\&func, $arg1, $arg2, $arg3);
307            
308             =head2 pipeline
309            
310             Returns a L<Coro::ProcessPool::Pipeline> object which can be used to pipe
311             requests through to the process pool. Results then come out the other end of
312             the pipe, not necessarily in the order in which they were queued. It is up to
313             the calling code to perform task accounting (for example, by passing an id in
314             as one of the arguments to the task class).
315            
316             my $pipe = $pool->pipeline;
317            
318             my $producer = async {
319             foreach my $args (@tasks) {
320             $pipe->queue('Some::Class', $args);
321             }
322            
323             $pipe->shutdown;
324             };
325            
326             while (my $result = $pipe->next) {
327             ...
328             }
329            
330             All arguments to C<pipeline()> are passed transparently to the constructor of
331             L<Coro::ProcessPool::Pipeline>. There is no limit to the number of pipelines
332             which may be created for a pool.
333            
334             =head1 A NOTE ABOUT IMPORTS AND CLOSURES
335            
336             Code refs are serialized using L<Data::Dump::Streamer>, allowing closed over
337             variables to be available to the code being called in the sub-process. Mutated
338             variables are I<not> updated when the result is returned.
339            
340             See L<Data::Dump::Streamer/Caveats-Dumping-Closures-(CODE-Refs)> for important
341             notes regarding closures.
342            
343             =head2 Use versus require
344            
345             The C<use> pragma is run at compile time, whereas C<require> is evaluated at
346             runtime. Because of this, the use of C<use> in code passed directly to the
347             C<process> method can fail in the worker process because the C<use> statement
348             has already been evaluated in the parent process when the calling code was
349             compiled.
350            
351             This will not work:
352            
353             $pool->process(sub {
354             use Foo;
355             my $foo = Foo->new();
356             });
357            
358             This will work:
359            
360             $pool->process(sub {
361             require Foo;
362             my $foo = Foo->new();
363             });
364            
365             If C<use> is necessary (for example, to import a method or transform the
366             calling code via import), it is recommended to move the code into its own
367             module (or to expliticly call require and import in the subroutine), which can
368             then be called in the anonymous routine:
369            
370             package Bar;
371            
372             use Foo;
373            
374             sub dostuff {
375             ...
376             }
377            
378             Then, in your caller:
379            
380             $pool->process(sub {
381             require Bar;
382             Bar::dostuff();
383             });
384            
385             Alternately, a task class may be used if dependency management is causing a
386             headaches:
387            
388             my $result = $pool->process('Task::Class', @args);
389            
390             =head1 COMPATIBILITY
391            
392             C<Coro::ProcessPool> will likely break on Win32 due to missing support for
393             non-blocking file descriptors (Win32 can only call C<select> and C<poll> on
394             actual network sockets). Without rewriting this as a network server, which
395             would impact performance and be really annoying, it is likely this module will
396             not support Win32 in the near future.
397            
398             The following modules will get you started if you wish to explore a synchronous
399             process pool on Windows:
400            
401             =over
402            
403             =item L<Win32::Process>
404            
405             =item L<Win32::IPC>
406            
407             =item L<Win32::Pipe>
408            
409             =back
410            
411             =head1 SEE ALSO
412            
413             =over
414            
415             =item L<Coro>
416            
417             =item L<AnyEvent/condvar>
418            
419             =back
420            
421             =head1 AUTHOR
422            
423             Jeff Ober <sysread@fastmail.fm>
424            
425             =head1 COPYRIGHT AND LICENSE
426            
427             This software is copyright (c) 2017 by Jeff Ober.
428            
429             This is free software; you can redistribute it and/or modify it under
430             the same terms as the Perl 5 programming language system itself.
431            
432             =cut
433