File Coverage

blib/lib/Parallel/Runner.pm
Criterion Covered Total %
statement 88 90 97.7
branch 34 36 94.4
condition 19 20 95.0
subroutine 21 21 100.0
pod 5 5 100.0
total 167 172 97.0


line stmt bran cond sub pod time code
1             package Parallel::Runner;
2 25     25   30542 use strict;
  25         63  
  25         983  
3 25     25   152 use warnings;
  25         26  
  25         760  
4              
5 25     25   25622 use POSIX ();
  25         218610  
  25         836  
6 25     25   26974 use Time::HiRes qw/sleep/;
  25         3769071  
  25         295  
7 25     25   6949 use Carp;
  25         63  
  25         2223  
8 25     25   11788 use Child qw/child/;
  25         72408  
  25         3639  
9              
10             our $VERSION = '0.013';
11              
12             for my $accessor (qw/ exit_callback data_callback iteration_callback _children pid max iteration_delay reap_callback pipe/) {
13             my $sub = sub {
14 22426     22426   79156 my $self = shift;
15 22426 100       93832 ( $self->{$accessor} ) = @_ if @_;
16 22426         364857866 return $self->{$accessor};
17             };
18 25     25   170 no strict 'refs';
  25         73  
  25         30517  
19             *$accessor = $sub;
20             }
21              
22             sub children {
23 3339     3339 1 346202 my $self = shift;
24 3339         11453 my @active;
25              
26 3339 100       10644 for my $proc ( @{$self->_children || []}, @_ ) {
  3339         19751  
27 8163 100       64320 if ( defined $proc->exit_status ) {
28 117 100       18528 if ( $self->data_callback ) {
29 8         106 my $data = $proc->read();
30 8         414 $self->data_callback->($data);
31             }
32              
33 117 100       537 $self->reap_callback->( $proc->exit_status, $proc->pid, $proc->pid, $proc )
34             if $self->reap_callback;
35              
36 117         93326 next;
37             }
38 8046         713891 push @active => $proc;
39             }
40              
41 3339         14797 $self->_children( \@active );
42 3339         23226 return @active;
43             }
44              
45             sub new {
46 58     58 1 33337 my $class = shift;
47 58         134 my ($max) = shift;
48 58   100     1085 return bless(
49             {
50             _children => [],
51             pid => $$,
52             max => $max || 1,
53             iteration_delay => 0.1,
54             @_,
55             },
56             $class
57             );
58             }
59              
60             sub run {
61 134     134 1 26211 my $self = shift;
62 134         333 my ( $code, $force_fork ) = @_;
63 134 100       397 croak("Called run() in child process")
64             unless $self->pid == $$;
65              
66 124   100     1617 my $fork = $force_fork || $self->max > 1;
67 124 100       817 return $self->_fork($code)
68             if $fork;
69              
70 10         69 my ($data) = $code->();
71 10 100       15001091 $self->data_callback->($data)
72             if $self->data_callback;
73              
74 7         28 return;
75             }
76              
77             sub _fork {
78 114     114   194 my $self = shift;
79 114         208 my ($code) = @_;
80              
81             # Wait for a slot
82             $self->_iterate(
83             sub {
84 2907     2907   56347 $self->children >= $self->max;
85             }
86 114         1263 );
87              
88             my $proc = Child->new(
89             sub {
90 17     17   473432 my $parent = shift;
91 17         818 $self->_children( [] );
92              
93 17         542 my @return = $code->($parent);
94              
95 17 100       78004881 $self->exit_callback->(@return)
96             if $self->exit_callback;
97              
98 17 100       424 $parent->write( $return[0] )
99             if $self->data_callback;
100              
101             },
102 100 100 100     966 $self->pipe || $self->data_callback ? ( pipe => $self->pipe ) : ()
103             )->start();
104              
105 634     634   10590 $self->_iterate( sub { !defined $proc->exit_status } )
106 83 100       3806062 if $self->max == 1;
107              
108 81         2767 $self->children($proc);
109              
110 81         495 return $proc;
111             }
112              
113             sub finish {
114 60     60 1 30148 my $self = shift;
115 60     270   1133 $self->_iterate( sub { $self->children }, @_ );
  270         2305  
116             }
117              
118             sub _iterate {
119 193     193   416 my $self = shift;
120 193         413 my ( $condition, $timeout, $timeoutsub ) = @_;
121 193         554 my $counter = 0;
122              
123 193         630 while ( $condition->() ) {
124 3635 100       120305 $self->iteration_callback->($self)
125             if $self->iteration_callback;
126              
127 3635         20232 $counter += $self->iteration_delay;
128 3635 100 100     16652 last if $timeout and $counter >= $timeout;
129              
130 3634         240646 sleep $self->iteration_delay;
131             }
132              
133 177 100 66     4324 $timeoutsub->()
      100        
134             if $timeout
135             && $timeoutsub
136             && $counter >= $timeout;
137 177         935 1;
138             }
139              
140             sub killall {
141 1     1 1 3 my $self = shift;
142 1         3 my ( $sig, $warn ) = @_;
143              
144 1 50       215 if ($warn) {
145 1         4 warn time . " - Killing: $_ - $sig\n" for grep { $_->pid } $self->children;
  1         12  
146             }
147              
148 1         40 $_->kill($sig) for $self->children;
149             }
150              
151             sub DESTROY {
152 58     58   40057169 my $self = shift;
153             return
154 58 100 100     395 unless $self->pid == $$
155             && $self->children;
156 1         32 warn <
157             Parallel::Runner object destroyed without first calling finish(), This will
158             terminate all your child processes. This either means you forgot to call
159             finish() or your parent process has died.
160             EOT
161              
162 1 50       25 return $self->finish()
163             if $^O eq 'MSWin32';
164              
165             $self->finish(
166             1,
167             sub {
168 1     1   34 $self->killall( 15, 1 );
169             $self->finish(
170             4,
171             sub {
172 0           $self->killall( 9, 1 );
173 0           $self->finish(10);
174             }
175 1         68 );
176             }
177 1         34 );
178             }
179              
180             1;
181              
182             =pod
183              
184             =head1 NAME
185              
186             Parallel::Runner - An object to manage running things in parallel processes.
187              
188             =head1 DESCRIPTION
189              
190             There are several other modules to do this, you probably want one of them. This
191             module exists as a super specialised parallel task manager. You create the
192             object with a proces limit and callbacks for what to do while waiting for a
193             free process slot, as well as a callback for what a process shoudl do just
194             before exiting.
195              
196             You must explicetly call $runner->finish() when you are done. If the runner is
197             destroyed before it's children are finished a warning will be generated and
198             your child processes will be killed, by force if necessary.
199              
200             If you specify a maximum of 1 then no forking will occur, and run() will block
201             until the coderef returns. You can force a fork by providing a boolean true
202             value as the second argument to run(), this will force the runner to fork
203             before running the coderef, however run() will still block until it the child
204             exits.
205              
206             =head1 SYNOPSYS
207              
208             #!/usr/bin/perl
209             use strict;
210             use warnings;
211             use Parallel::Runner;
212              
213             my $runner = Parallel::Runner->new(4);
214             $runner->run( sub { ... } );
215             $runner->run( sub { ... } );
216             $runner->run( sub { ... } );
217             $runner->run( sub { ... } );
218              
219             # This will block until one of the previous 4 finishes
220             $runner->run( sub { ... } );
221              
222             # Do not forget this.
223             $runner->finish;
224              
225             =head1 CONSTRUCTOR
226              
227             =over 4
228              
229             =item $runner = $class->new( $max, $accessor => $value, ... );
230              
231             Create a new instance of Parallel::Runner. $accessor can be anything listed
232             under the ACCESSORS section. $max should be the maximum number of processes
233             allowed, defaults to 1.
234              
235             =back
236              
237             =head1 ACCESSORS
238              
239             These are simple accessors, provididng an argument sets the accessor to that
240             argument, no argument it simply returns the current value.
241              
242             =over 4
243              
244             =item $val = $runner->data_callback( \&callback )
245              
246             If this is specified than IPC will be automatically enabled, and the final
247             return from each process will be passed into this handler in the main process.
248             Due to the way IPC works only strings/numerical data is passed, if you need to
249             pass a ref you will need to serialize it yourself before returning it, followed
250             by deserializing it in your callback.
251              
252             Example:
253              
254             # Place to put the accumulated data
255             my @accum_data;
256              
257             # Create the runner with a callback that pushes the data onto our array.
258             $runner = $CLASS->new( 2,
259             data_callback => sub {
260             my ($data) = @_;
261             push @accum_data => $data;
262             },
263             );
264              
265             # 4 processes that return data
266             $runner->run( sub { return "foo" });
267             $runner->run( sub { return "bar" });
268             $runner->run( sub { return "baz" });
269             $runner->run( sub { return "bat" });
270             $runner->finish;
271              
272             # Verify the data (order is not predictable)
273             is_deeply(
274             [ sort @accum_data ],
275             [ sort qw/foo bar baz bat/ ],
276             "Got all data returned by subprocesses"
277             );
278              
279             =item $val = $runner->exit_callback( \&callback )
280              
281             Codref to call just before a child exits (called within child)
282              
283             =item $val = $runner->iteration_delay( $float );
284              
285             How long to wait per iterate if nothing has changed.
286              
287             =item $val = $runner->iteration_callback( $newval )
288              
289             Coderef to call multiple times in a loop while run() is blocking waiting for a
290             process slot.
291              
292             =item $val = $runner->reap_callback( $newval )
293              
294             Codref to call whenever a pid is reaped using waitpid. The callback sub will be
295             passed 3 values The first is the exit status of the child process. The second
296             is the pid of the child process. The third used to be the return of waitpid,
297             but this is depricated as L is now used and throws an exception when
298             waitpid is not what it should be. The third is simply the pid of the child
299             process again. The final argument is the child process object itself.
300              
301             $runner->reap_callback( sub {
302             my ( $status, $pid, $pid_again, $proc ) = @_;
303              
304             # Status as returned from system, so 0 is good, 1+ is bad.
305             die "Child $pid did not exit 0"
306             if $status;
307             });
308              
309             =item @children = $runner->children( @append )
310              
311             Returns a list of L objects.
312              
313             =item $val = $runner->pid()
314              
315             pid of the parent process
316              
317             =item $val = $runner->max( $newval )
318              
319             Maximum number of children
320              
321             =back
322              
323             =head1 OBJECT METHODS
324              
325             =over 4
326              
327             =item run( $code )
328              
329             =item run( $code, $force_fork )
330              
331             Run the specified code in a child process. Blocks if no free slots are
332             available. Force fork can be used to force a fork when max is 1, however it
333             will still block until the child exits.
334              
335             =item finish()
336              
337             =item finish( $timeout )
338              
339             =item finish( $timeout, $timeoutcallback )
340              
341             Wait for all children to finish, then clean up after them. If a timeout is
342             specified it will return after the timeout regardless of wether or not children
343             have all exited. If there is a timeout call back then that code will be run
344             upon timeout just before the method returns.
345              
346             NOTE: DO NOT LET YOUR RUNNER BE DESTROYED BEFORE FINISH COMPLETES WITHOUT A
347             TIMEOUT.
348              
349             the runner will kill all children, possibly with force if your runner is
350             destroyed with children still running, or not waited on.
351              
352             =item killall( $sig )
353              
354             Send all children the specified kill signal.
355              
356             =item DESTROY()
357              
358             Automagically called when the object is destroyed. If called while children are
359             running it will forcefully clean up after you as follows:
360              
361             1) Sends an ugly warning.
362              
363             2) Will first give all your children 1 second to complete.
364              
365             Windows) Strawberry fails with processes, so on windows DESTROY will wait as
366             long as needed, possibly forever.
367              
368             3) Sends kill signal 15 to all children then waits up to 4 seconds.
369              
370             4) Sends kill signal 9 to any remaining children then waits up to 10 seconds
371              
372             5) Gives up and returns
373              
374             =back
375              
376             =head1 FENNEC PROJECT
377              
378             This module is part of the Fennec project. See L for more details.
379             Fennec is a project to develop an extendable and powerful testing framework.
380             Together the tools that make up the Fennec framework provide a potent testing
381             environment.
382              
383             The tools provided by Fennec are also useful on their own. Sometimes a tool
384             created for Fennec is useful outside the greator framework. Such tools are
385             turned into their own projects. This is one such project.
386              
387             =over 2
388              
389             =item L - The core framework
390              
391             The primary Fennec project that ties them all together.
392              
393             =back
394              
395             =head1 AUTHORS
396              
397             Chad Granum L
398              
399             =head1 COPYRIGHT
400              
401             Copyright (C) 2010 Chad Granum
402              
403             Parallel-Runner is free software; Standard perl licence.
404              
405             Parallel-Runner is distributed in the hope that it will be useful, but WITHOUT
406             ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
407             FOR A PARTICULAR PURPOSE. See the license for more details.