File Coverage

blib/lib/Coro/ProcessPool.pm
Criterion Covered Total %
statement 34 39 87.1
branch n/a
condition n/a
subroutine 10 12 83.3
pod 5 6 83.3
total 49 57 85.9


line stmt bran cond sub pod time code
1             package Coro::ProcessPool;
2             # ABSTRACT: An asynchronous pool of perl processes
3             $Coro::ProcessPool::VERSION = '0.29';
4 2     2   233329 use strict;
  2         18  
  2         50  
5 2     2   9 use warnings;
  2         4  
  2         45  
6 2     2   8 use Coro;
  2         3  
  2         105  
7 2     2   471 use Coro::Countdown;
  2         498  
  2         49  
8 2     2   487 use AnyEvent::ProcessPool;
  2         246453  
  2         506  
9              
10             sub new {
11 7     7 0 144998   my ($class, %param) = @_;
12              
13               my $pool = AnyEvent::ProcessPool->new(
14                 workers => $param{max_procs},
15                 limit => $param{max_reqs},
16 7         72   );
17              
18               my $self = bless {
19                 pool => $pool,
20                 max_procs => $pool->{workers},
21 7         9170   }, $class;
22              
23 7         30   return $self;
24             }
25              
26             sub join {
27 4     4 1 545568   my $self = shift;
28 4         22   $self->{pool}->join;
29             }
30              
31             sub defer {
32 44     44 1 45738   my $self = shift;
33 44         150   $self->{pool}->async(@_)
34             }
35              
36             sub process {
37 11     11 1 568638   my $self = shift;
38 11         68   $self->{pool}->async(@_)->recv;
39             }
40              
41             sub map {
42 1     1 1 17   my ($self, $f, @args) = @_;
43              
44             # Inverse semaphore to track pending requests
45 1         13   my $rem = new Coro::Countdown;
46              
47             # Queue each argument and store as an ordered list to preserve original
48             # ordering of the argments
49               my @cvs = map {
50 1         1553     $rem->up;
  11         23398  
51 11         74     $self->defer($f, $_);
52               } @args;
53              
54             # Collect results, retaining original ordering by respecting the orignial
55             # list index
56 1         109   my @res;
57 1         11   foreach my $i (0 .. $#args) {
58                 async_pool {
59 0     0   0       $res[$i] = $_[0]->recv;
60 0         0       $rem->down;
61 11         86     } $cvs[$i];
62               }
63              
64             # Wait for all requests to complete and return the result
65 1         44   $rem->join;
66 0             return @res;
67             }
68              
69             sub pipeline {
70 0     0 1     my $self = shift;
71 0             return Coro::ProcessPool::Pipeline->new(pool => $self, @_);
72             }
73              
74              
75             1;
76              
77             __END__
78            
79             =pod
80            
81             =encoding UTF-8
82            
83             =head1 NAME
84            
85             Coro::ProcessPool - An asynchronous pool of perl processes
86            
87             =head1 VERSION
88            
89             version 0.29
90            
91             =head1 SYNOPSIS
92            
93             use Coro::ProcessPool;
94             use Coro;
95            
96             my $pool = Coro::ProcessPool->new(
97             max_procs => 4,
98             max_reqs => 100,
99             include => ['/path/to/my/task/classes', '/path/to/other/packages'],
100             );
101            
102             my $double = sub { $_[0] * 2 };
103            
104             #-----------------------------------------------------------------------
105             # Process in sequence, waiting for each result in turn
106             #-----------------------------------------------------------------------
107             my %result;
108             foreach my $i (1 .. 1000) {
109             $result{$i} = $pool->process($double, $i);
110             }
111            
112             #-----------------------------------------------------------------------
113             # Process as a batch
114             #-----------------------------------------------------------------------
115             my @results = $pool->map($double, 1 .. 1000);
116            
117             #-----------------------------------------------------------------------
118             # Defer waiting for result
119             #-----------------------------------------------------------------------
120             my %deferred;
121            
122             $deferred{$_} = $pool->defer($double, $_)
123             foreach 1 .. 1000;
124            
125             # Later
126             foreach my $i (keys %deferred) {
127             print "$i = " . $deferred{$i}->() . "\n";
128             }
129            
130             #-----------------------------------------------------------------------
131             # Use a "task class" implementing 'new' and 'run'
132             #-----------------------------------------------------------------------
133             my $result = $pool->process('Task::Doubler', 21);
134            
135             #-----------------------------------------------------------------------
136             # Pipelines (work queues)
137             #-----------------------------------------------------------------------
138             my $pipe = $pool->pipeline;
139            
140             # Start producer thread to queue tasks
141             my $producer = async {
142             while (my $task = get_next_task()) {
143             $pipe->queue('Some::TaskClass', $task);
144             }
145            
146             # Let the pipeline know no more tasks are coming
147             $pipe->shutdown;
148             };
149            
150             # Collect the results of each task as they are received
151             while (my $result = $pipe->next) {
152             do_stuff_with($result);
153             }
154            
155             $pool->shutdown;
156            
157             =head1 DESCRIPTION
158            
159             Processes tasks using a pool of external Perl processes.
160            
161             =head1 CONSTRUCTOR
162            
163             my $pool = Coro::ProcessPool->new(
164             max_procs => 4,
165             max_reqs => 100,
166             include => ['path/to/my/packages', 'some/more/packages'],
167             );
168            
169             =head2 max_procs
170            
171             The maximum number of processes to run within the process pool. Defaults
172             to the number of CPUs on the ssytem.
173            
174             =head2 max_reqs
175            
176             The maximum number of tasks a worker process may run before being terminated
177             and replaced with a fresh process. This is useful for tasks that might leak
178             memory over time.
179            
180             =head2 include
181            
182             An optional array ref of directory paths to prepend to the set of directories
183             the worker process will use to find Perl packages.
184            
185             =head1 METHODS
186            
187             =head2 join
188            
189             Cedes control to the event loop until the pool has completed all remaining
190             tasks and woken up any threads watching them.
191            
192             =head2 defer
193            
194             Queues a task to be processed by the pool. Tasks may specified in either of two
195             forms, as a code ref or the fully qualified name of a perl class which
196             implements two methods, C<new> and C<run>. Any remaining arguments to C<defer>
197             are passed unchanged to the code ref or the C<new> method of the task class.
198            
199             C<defer> will immediately return an L<AnyEvent/condvar> that will wait for and
200             return the result of the task (or croak if the task generated an error).
201            
202             # Using a code ref
203             my $cv = $pool->defer(\&func, $arg1, $arg2, $arg3);
204             my $result = $cv->recv;
205            
206             # With a task class
207             my $cv = $pool->defer('Some::Task::Class', $arg1, $arg2, $arg3);
208             my $result = $cv->recv;
209            
210             =head2 process
211            
212             Calls defer and immediately calls C<recv> on the returned condvar, returning
213             the result. This is useful if your workflow includes multiple threads which
214             share the same pool. All arguments are passed unchanged to C<defer>.
215            
216             =head2 map
217            
218             Like perl's C<map>, applies a code ref to a list of arguments. This method will
219             cede until all results have been returned by the pool, returning the result as
220             a list. The order of arguments and results is preserved as expected.
221            
222             my @results = $pool->map(\&func, $arg1, $arg2, $arg3);
223            
224             =head2 pipeline
225            
226             Returns a L<Coro::ProcessPool::Pipeline> object which can be used to pipe
227             requests through to the process pool. Results then come out the other end of
228             the pipe, not necessarily in the order in which they were queued. It is up to
229             the calling code to perform task accounting (for example, by passing an id in
230             as one of the arguments to the task class).
231            
232             my $pipe = $pool->pipeline;
233            
234             my $producer = async {
235             foreach my $args (@tasks) {
236             $pipe->queue('Some::Class', $args);
237             }
238            
239             $pipe->shutdown;
240             };
241            
242             while (my $result = $pipe->next) {
243             ...
244             }
245            
246             All arguments to C<pipeline()> are passed transparently to the constructor of
247             L<Coro::ProcessPool::Pipeline>. There is no limit to the number of pipelines
248             which may be created for a pool.
249            
250             =head1 A NOTE ABOUT IMPORTS AND CLOSURES
251            
252             Code refs are serialized using L<Data::Dump::Streamer>, allowing closed over
253             variables to be available to the code being called in the sub-process. Mutated
254             variables are I<not> updated when the result is returned.
255            
256             See L<Data::Dump::Streamer/Caveats-Dumping-Closures-(CODE-Refs)> for important
257             notes regarding closures.
258            
259             =head2 Use versus require
260            
261             The C<use> pragma is run at compile time, whereas C<require> is evaluated at
262             runtime. Because of this, the use of C<use> in code passed directly to the
263             C<process> method can fail in the worker process because the C<use> statement
264             has already been evaluated in the parent process when the calling code was
265             compiled.
266            
267             This will not work:
268            
269             $pool->process(sub {
270             use Foo;
271             my $foo = Foo->new();
272             });
273            
274             This will work:
275            
276             $pool->process(sub {
277             require Foo;
278             my $foo = Foo->new();
279             });
280            
281             If C<use> is necessary (for example, to import a method or transform the
282             calling code via import), it is recommended to move the code into its own
283             module (or to expliticly call require and import in the subroutine), which can
284             then be called in the anonymous routine:
285            
286             package Bar;
287            
288             use Foo;
289            
290             sub dostuff {
291             ...
292             }
293            
294             Then, in your caller:
295            
296             $pool->process(sub {
297             require Bar;
298             Bar::dostuff();
299             });
300            
301             Alternately, a task class may be used if dependency management is causing a
302             headaches:
303            
304             my $result = $pool->process('Task::Class', @args);
305            
306             =head1 COMPATIBILITY
307            
308             C<Coro::ProcessPool> will likely break on Win32 due to missing support for
309             non-blocking file descriptors (Win32 can only call C<select> and C<poll> on
310             actual network sockets). Without rewriting this as a network server, which
311             would impact performance and be really annoying, it is likely this module will
312             not support Win32 in the near future.
313            
314             The following modules will get you started if you wish to explore a synchronous
315             process pool on Windows:
316            
317             =over
318            
319             =item L<Win32::Process>
320            
321             =item L<Win32::IPC>
322            
323             =item L<Win32::Pipe>
324            
325             =back
326            
327             =head1 SEE ALSO
328            
329             =over
330            
331             =item L<Coro>
332            
333             =item L<AnyEvent/condvar>
334            
335             =back
336            
337             =head1 AUTHOR
338            
339             Jeff Ober <sysread@fastmail.fm>
340            
341             =head1 COPYRIGHT AND LICENSE
342            
343             This software is copyright (c) 2017 by Jeff Ober.
344            
345             This is free software; you can redistribute it and/or modify it under
346             the same terms as the Perl 5 programming language system itself.
347            
348             =cut
349