File Coverage

blib/lib/Parallel/Iterator.pm
Criterion Covered Total %
statement 175 189 92.5
branch 65 82 79.2
condition 24 26 92.3
subroutine 30 30 100.0
pod 3 3 100.0
total 297 330 90.0


line stmt bran cond sub pod time code
1 9     9   523707 use 5.008; use warnings; use strict;
  9     9   54  
  9     9   42  
  9         16  
  9         231  
  9         37  
  9         18  
  9         335  
2              
3             package Parallel::Iterator;
4              
5 9     9   51 use Carp;
  9         14  
  9         518  
6 9     9   4884 use Storable qw( store_fd fd_retrieve dclone );
  9         24558  
  9         487  
7 9     9   4051 use IO::Handle;
  9         45536  
  9         323  
8 9     9   3379 use IO::Select;
  9         12062  
  9         344  
9 9     9   61 use Config;
  9         18  
  9         376  
10              
11             our $VERSION = '1.001';
12              
13 9     9   44 use Exporter (); *import = \&Exporter::import;
  9         15  
  9         502  
14             our @EXPORT_OK = qw( iterate iterate_as_array iterate_as_hash );
15              
16 9     9   80 use constant IS_WIN32 => ( $^O =~ /^(MS)?Win32$/ );
  9         24  
  9         5661  
17              
18             my %DEFAULTS = (
19             workers => ( ( $Config{d_fork} && !IS_WIN32 ) ? 10 : 0 ),
20             onerror => 'die',
21             nowarn => 0,
22             batch => 1,
23             adaptive => 0,
24             );
25              
26             =head1 NAME
27              
28             Parallel::Iterator - Simple parallel execution
29              
30             =head1 SYNOPSIS
31              
32             use Parallel::Iterator qw( iterate );
33              
34             # A very expensive way to double 100 numbers...
35            
36             my @nums = ( 1 .. 100 );
37            
38             my $iter = iterate( sub {
39             my ( $id, $job ) = @_;
40             return $job * 2;
41             }, \@nums );
42            
43             my @out = ();
44             while ( my ( $index, $value ) = $iter->() ) {
45             $out[$index] = $value;
46             }
47              
48             The C function applies a user supplied transformation function to
49             each element in a list, returning a new list containing the
50             transformed elements.
51              
52             =head1 DESCRIPTION
53              
54             This module provides a 'parallel map'. Multiple worker processes are
55             forked so that many instances of the transformation function may be
56             executed simultaneously.
57              
58             For time consuming operations, particularly operations that spend most
59             of their time waiting for I/O, this is a big performance win. It also
60             provides a simple idiom to make effective use of multi CPU systems.
61              
62             There is, however, a considerable overhead associated with forking, so
63             the example in the synopsis (doubling a list of numbers) is I a
64             sensible use of this module.
65              
66             =head1 MANUAL
67              
68             =head2 Basic Usage
69              
70             Imagine you have an array of URLs to fetch:
71              
72             my @urls = qw(
73             http://google.com/
74             http://hexten.net/
75             http://search.cpan.org/
76             ... and lots more ...
77             );
78              
79             Write a function that retrieves a URL and returns its contents or undef
80             if it can't be fetched:
81              
82             sub fetch {
83             my ($id, $url) = @_;
84             my $resp = $ua->get($url);
85             return unless $resp->is_success;
86             return $resp->content;
87             };
88              
89             Now write a function to synthesize a special kind of iterator:
90              
91             sub list_iter {
92             my @ar = @_;
93             my $pos = 0;
94             return sub {
95             return if $pos >= @ar;
96             my @r = ( $pos, $ar[$pos] ); # Note: returns ( index, value )
97             $pos++;
98             return @r;
99             };
100             }
101              
102             The returned iterator will return each element of the array in turn and
103             then undef. Actually it returns both the index I the value of each
104             element in the array. Because multiple instances of the transformation
105             function execute in parallel the results won't necessarily come back in
106             order. The array index will later allow us to put completed items in the
107             correct place in an output array.
108              
109             Get an iterator for the list of URLs:
110              
111             my $url_iter = list_iter( @urls );
112              
113             Then wrap it in another iterator which will return the transformed results:
114              
115             my $page_iter = iterate( \&fetch, $url_iter );
116              
117             Finally loop over the returned iterator storing results:
118              
119             my @out = ( );
120             while ( my ( $index, $value ) = $page_iter->() ) {
121             $out[$index] = $value;
122             }
123              
124             Behind the scenes your program forked into ten (by default) instances of
125             itself and executed the page requests in parallel.
126              
127             =head2 Simpler interfaces
128              
129             Having to construct an iterator is a pain so C is smart enough
130             to do that for you. Instead of passing an iterator just pass a reference
131             to the array:
132              
133             my $page_iter = iterate( \&fetch, \@urls );
134              
135             If you pass a hash reference the iterator you get back will return key,
136             value pairs:
137              
138             my $some_iter = iterate( \&fetch, \%some_hash );
139              
140             If the returned iterator is inconvenient you can get back a hash or
141             array instead:
142              
143             my @done = iterate_as_array( \&fetch, \@urls );
144              
145             my %done = iterate_as_hash( \&worker, \%jobs );
146              
147             =head2 How It Works
148              
149             The current process is forked once for each worker. Each forked child is
150             connected to the parent by a pair of pipes. The child's STDIN, STDOUT
151             and STDERR are unaffected.
152              
153             Input values are serialised (using Storable) and passed to the workers.
154             Completed work items are serialised and returned.
155              
156             =head2 Caveats
157              
158             Parallel::Iterator is designed to be simple to use - but the underlying
159             forking of the main process can cause mystifying problems unless you
160             have an understanding of what is going on behind the scenes.
161              
162             =head3 Worker execution enviroment
163              
164             All code apart from the worker subroutine executes in the parent process
165             as normal. The worker executes in a forked instance of the parent
166             process. That means that things like this won't work as expected:
167              
168             my %tally = ();
169             my @r = iterate_as_array( sub {
170             my ($id, $name) = @_;
171             $tally{$name}++; # might not do what you think it does
172             return reverse $name;
173             }, \@names );
174              
175             # Now print out the tally...
176             while ( my ( $name, $count ) = each %tally ) {
177             printf("%5d : %s\n", $count, $name);
178             }
179              
180             Because the worker is a closure it can see the C<%tally> hash from its
181             enclosing scope; but because it's running in a forked clone of the parent
182             process it modifies its own copy of C<%tally> rather than the copy for
183             the parent process.
184              
185             That means that after the job terminates the C<%tally> in the parent
186             process will be empty.
187              
188             In general you should avoid side effects in your worker subroutines.
189              
190             =head3 Serialization
191              
192             Values are serialised using L to pass to the worker subroutine
193             and results from the worker are again serialised before being passed
194             back. Be careful what your values refer to: everything has to be
195             serialised. If there's an indirect way to reach a large object graph
196             Storable will find it and performance will suffer.
197              
198             To find out how large your serialised values are serialise one of them
199             and check its size:
200              
201             use Storable qw( freeze );
202             my $serialized = freeze $some_obj;
203             print length($serialized), " bytes\n";
204              
205             In your tests you may wish to guard against the possibility of a change
206             to the structure of your values resulting in a sudden increase in
207             serialized size:
208              
209             ok length(freeze $some_obj) < 1000, "Object too bulky?";
210              
211             See the documetation for L for other caveats.
212              
213             =head3 Performance
214              
215             Process forking is expensive. Only use Parallel::Iterator in cases where:
216              
217             =over
218              
219             =item the worker waits for I/O
220              
221             The case of fetching web pages is a good example of this. Fetching a
222             page with LWP::UserAgent may take as long as a few seconds but probably
223             consumes only a few milliseconds of processor time. Running many
224             requests in parallel is a huge win - but be kind to the server you're
225             talking to: don't launch a lot of parallel requests unless it's your
226             server or you know it can handle the load.
227              
228             =item the worker is CPU intensive and you have multiple cores / CPUs
229              
230             If the worker is doing an expensive calculation you can parallelise that
231             across multiple CPU cores. Benchmark first though. There's a
232             considerable overhead associated with Parallel::Iterator; unless your
233             calculations are time consuming that overhead will dwarf whatever time
234             they take.
235              
236             =back
237              
238             =head1 INTERFACE
239              
240             =head2 C<< iterate( [ $options ], $worker, $iterator ) >>
241              
242             Get an iterator that applies the supplied transformation function to
243             each value returned by the input iterator.
244              
245             Instead of an iterator you may pass an array or hash reference and
246             C will convert it internally into a suitable iterator.
247              
248             If you are doing this you may wish to investigate C and
249             C.
250              
251             =head3 Options
252              
253             A reference to a hash of options may be supplied as the first argument.
254             The following options are supported:
255              
256             =over
257              
258             =item C
259              
260             The number of concurrent processes to launch. Set this to 0 to disable
261             forking. Defaults to 10 on systems that support fork and 0 (disable
262             forking) on those that do not.
263              
264             =item C
265              
266             Normally C will issue a warning and fall back to single process
267             mode on systems on which fork is not available. This option supresses
268             that warning.
269              
270             =item C
271              
272             Ordinarily items are passed to the worker one at a time. If you are
273             processing a large number of items it may be more efficient to process
274             them in batches. Specify the batch size using this option.
275              
276             Batching is transparent from the caller's perspective. Internally it
277             modifies the iterators and worker (by wrapping them in additional
278             closures) so that they pack, process and unpack chunks of work.
279              
280             =item C
281              
282             Extending the idea of batching a number of work items to amortize the
283             overhead of passing work to and from parallel workers you may also ask
284             C to heuristically determine the batch size by setting the
285             C option to a numeric value.
286              
287             The batch size will be computed as
288              
289             / /
290              
291             A larger value for C will reduce the rate at which the batch
292             size increases. Good values tend to be in the range 1 to 2.
293              
294             You can also specify lower and, optionally, upper bounds on the batch
295             size by passing an reference to an array containing ( lower bound,
296             growth ratio, upper bound ). The upper bound may be omitted.
297              
298             my $iter = iterate(
299             { adaptive => [ 5, 2, 100 ] },
300             $worker, \@stuff );
301              
302             =item C
303              
304             The action to take when an error is thrown in the iterator. Possible
305             values are 'die', 'warn' or a reference to a subroutine that will be
306             called with the index of the job that threw the exception and the value
307             of C<$@> thrown.
308              
309             iterate( {
310             onerror => sub {
311             my ($id, $err) = @_;
312             $self->log( "Error for index $id: $err" );
313             },
314             $worker,
315             \@jobs
316             );
317              
318             The default is 'die'.
319            
320             =back
321              
322             =cut
323              
324             sub _massage_iterator {
325 27     27   40 my $iter = shift;
326 27 100       99 if ( 'ARRAY' eq ref $iter ) {
    100          
    50          
327 21         1606 my @ar = @$iter;
328 21         64 my $pos = 0;
329             return sub {
330 50202 100   50202   70809 return if $pos >= @ar;
331 50126         64997 my @r = ( $pos, $ar[$pos] );
332 50126         52753 $pos++;
333 50126         104815 return @r;
334 21         135 };
335             }
336             elsif ( 'HASH' eq ref $iter ) {
337 2         12 my %h = %$iter;
338 2         7 my @k = keys %h;
339             return sub {
340 12 100   12   36 return unless @k;
341 10         70 my $k = shift @k;
342 10         52 return ( $k, $h{$k} );
343 2         16 };
344             }
345             elsif ( 'CODE' eq ref $iter ) {
346 4         8 return $iter;
347             }
348             else {
349 0         0 croak "Iterator must be a code, array or hash ref";
350             }
351             }
352              
353             sub _nonfork {
354 9     9   28 my ( $options, $worker, $iter ) = @_;
355              
356             return sub {
357 551     551   760 while ( 1 ) {
358 551 100       747 if ( my @next = $iter->() ) {
359 542         1062 my ( $id, $work ) = @next;
360             # dclone so that we have the same semantics as the
361             # forked version.
362 542 100 66     22723 $work = dclone $work if defined $work && ref $work;
363 542         987 my $result = eval { $worker->( $id, $work ) };
  542         863  
364 542 50       7578 if ( my $err = $@ ) {
365 0         0 $options->{onerror}->( $id, $err );
366             }
367             else {
368 542         1149 return ( $id, $result );
369             }
370             }
371             else {
372 9         30 return;
373             }
374             }
375 9         42 };
376             }
377              
378             # Does this sub look a bit long to you? :)
379             sub _fork {
380 18     18   51 my ( $options, $worker, $iter ) = @_;
381              
382 18         30 my @workers = ();
383 18         24 my @result_queue = ();
384 18         227 my $select = IO::Select->new;
385 18         248 my $rotate = 0;
386              
387             return sub {
388             LOOP: {
389             # Make new workers
390 946   100 946   1704 while ( @workers < $options->{workers} && ( my @next = $iter->() ) )
  1283         2653  
391             {
392              
393 76         2830 my ( $my_rdr, $my_wtr, $child_rdr, $child_wtr )
394             = map IO::Handle->new, 1 .. 4;
395              
396 76 50       10925 pipe $child_rdr, $my_wtr
397             or croak "Can't open write pipe ($!)\n";
398              
399 76 50       1730 pipe $my_rdr, $child_wtr
400             or croak "Can't open read pipe ($!)\n";
401              
402 76 50       45629 if ( my $pid = fork ) {
403             # Parent
404 76         4697 close $_ for $child_rdr, $child_wtr;
405 76         1133 push @workers, $pid;
406 76         3799 $select->add( [ $my_rdr, $my_wtr, 0 ] );
407 76         10250 _put_obj( \@next, $my_wtr );
408             }
409             else {
410             # Child
411 0         0 close $_ for $my_rdr, $my_wtr;
412              
413             # Don't execute any END blocks
414 9     9   3704 use POSIX '_exit';
  9         46958  
  9         44  
415 0         0 eval q{END { _exit 0 }};
416              
417             # Worker loop
418 0         0 while ( defined( my $job = _get_obj( $child_rdr ) ) ) {
419 0         0 my $result = eval { $worker->( @$job ) };
  0         0  
420 0         0 my $err = $@;
421 0 0       0 _put_obj(
422             [
423             $err
424             ? ( 'E', $job->[0], $err )
425             : ( 'R', $job->[0], $result )
426             ],
427             $child_wtr
428             );
429             }
430              
431             # End of stream
432 0         0 _put_obj( undef, $child_wtr );
433 0         0 close $_ for $child_rdr, $child_wtr;
434             # We use CORE::exit for MP compatibility
435 0         0 CORE::exit;
436             }
437             }
438              
439 1283 100       2010 return @{ shift @result_queue } if @result_queue;
  928         2231  
440 355 100       1229 if ( $select->count ) {
441 338         1425 eval {
442 338         677 my @rdr = $select->can_read;
443             # Anybody got completed work?
444 338         33043 for my $r ( @rdr ) {
445 1014         2652 my ( $rh, $wh, $eof ) = @$r;
446 1014 100       1772 if ( defined( my $results = _get_obj( $rh ) ) ) {
447 939         1522 my $type = shift @$results;
448 939 100       1658 if ( $type eq 'R' ) {
    50          
449 928         1474 push @result_queue, $results;
450             }
451             elsif ( $type eq 'E' ) {
452 11         373 $options->{onerror}->( @$results );
453             }
454             else {
455 0         0 die "Bad result type: $type";
456             }
457              
458             # We operate a strict one in, one out policy
459             # - which avoids deadlocks. Having received
460             # the previous result send a new work value.
461 938 50       1607 unless ( $eof ) {
462 938 100       1605 if ( my @next = $iter->() ) {
463 863         2879 _put_obj( \@next, $wh );
464             }
465             else {
466 75         207 _put_obj( undef, $wh );
467 75         679 close $wh;
468 75         147 @{$r}[ 1, 2 ] = ( undef, 1 );
  75         755  
469             }
470             }
471             }
472             else {
473 75         238 $select->remove( $r );
474 75         3278 close $rh;
475             }
476             }
477             };
478              
479 338 100       1101 if ( my $err = $@ ) {
480             # Finish all the workers
481 1         17 _put_obj( undef, $_->[1] ) for $select->handles;
482              
483             # And wait for them to exit
484 1         1021 waitpid( $_, 0 ) for @workers;
485              
486             # Rethrow
487 1         13 die $err;
488             }
489              
490 337         617 redo LOOP;
491             }
492 17         19184 waitpid( $_, 0 ) for @workers;
493 17         131 return;
494             }
495 18         215 };
496             }
497              
498             sub _batch_input_iter {
499 10     10   27 my ( $code, $options ) = @_;
500              
501 10 100       32 if ( my $adapt = $options->{adaptive} ) {
502 6   100     38 my $workers = $options->{workers} || 1;
503 6         15 my $count = 0;
504              
505 6 100       31 $adapt = [ 1, $adapt, undef ]
506             unless 'ARRAY' eq ref $adapt;
507              
508 6         34 my ( $min, $ratio, $max ) = @$adapt;
509 6 100 66     71 $min = 1 unless defined $min && $min > 1;
510              
511             return sub {
512 774     774   1237 my @chunk = ();
513              
514             # Adapt batch size
515 774         1683 my $batch = $count / $workers / $ratio;
516 774 100       1555 $batch = $min if $batch < $min;
517 774 100 100     2162 $batch = $max if defined $max && $batch > $max;
518              
519 774   100     2364 while ( @chunk < $batch && ( my @next = $code->() ) ) {
520 30000         36153 push @chunk, \@next;
521 30000         53201 $count++;
522             }
523              
524 774 100       1927 return @chunk ? ( 0, \@chunk ) : ();
525 6         69 };
526             }
527             else {
528 4         8 my $batch = $options->{batch};
529              
530             return sub {
531 226     226   313 my @chunk = ();
532 226   100     916 while ( @chunk < $batch && ( my @next = $code->() ) ) {
533 20000         36665 push @chunk, \@next;
534             }
535 226 100       570 return @chunk ? ( 0, \@chunk ) : ();
536 4         25 };
537             }
538             }
539              
540             sub _batch_output_iter {
541 10     10   20 my $code = shift;
542 10         17 my @queue = ();
543             return sub {
544 50010 100   50010   65645 unless ( @queue ) {
545 955 100       1162 if ( my ( undef, $chunk ) = $code->() ) {
546 945         3258 @queue = @$chunk;
547             }
548             else {
549 10         33 return;
550             }
551             }
552 50000         52382 return @{ shift @queue };
  50000         85037  
553 10         61 };
554 0         0 return $code;
555             }
556              
557             sub _batch_worker {
558 10     10   20 my $code = shift;
559             return sub {
560 388     388   562 my ( undef, $chunk ) = @_;
561 388         547 for my $item ( @$chunk ) {
562 25000         71971 $item->[1] = $code->( @$item );
563             }
564 388         1258 return $chunk;
565 10         81 };
566             }
567              
568             sub iterate {
569 27 50   27 1 11536 my %options = ( %DEFAULTS, %{ 'HASH' eq ref $_[0] ? shift : {} } );
  27         303  
570              
571 27 50       135 croak "iterate takes 2 or 3 args" unless @_ == 2;
572              
573 27         96 my @bad_opt = grep { !exists $DEFAULTS{$_} } keys %options;
  135         251  
574 27 50       86 croak "Unknown option(s): ", join( ', ', sort @bad_opt ), "\n"
575             if @bad_opt;
576              
577 27         53 my $worker = shift;
578 27 50       104 croak "Worker must be a coderef"
579             unless 'CODE' eq ref $worker;
580              
581 27         77 my $iter = _massage_iterator( shift );
582              
583 27 100       293 if ( $options{onerror} =~ /^(die|warn)$/ ) {
584 26         2979 $options{onerror} = eval "sub { shift; $1 shift }";
585             }
586              
587             croak "onerror option must be 'die', 'warn' or a code reference"
588 27 50       129 unless 'CODE' eq ref $options{onerror};
589              
590 27 100 100     594 if ( $options{workers} > 0 && $DEFAULTS{workers} == 0 ) {
591             warn "Fork not available; falling back to single process mode\n"
592 3 50       8 unless $options{nowarn};
593 3         6 $options{workers} = 0;
594             }
595              
596 27 100       98 my $factory = $options{workers} == 0 ? \&_nonfork : \&_fork;
597              
598 27 100 100     167 if ( $options{batch} > 1 || $options{adaptive} ) {
599 10         65 return _batch_output_iter(
600             $factory->(
601             \%options,
602             _batch_worker( $worker ),
603             _batch_input_iter( $iter, \%options )
604             )
605             );
606             }
607             else {
608             # OK. Ready. Let's do it.
609 17         71 return $factory->( \%options, $worker, $iter );
610             }
611             }
612              
613             =head2 C<< iterate_as_array >>
614              
615             As C but instead of returning an iterator returns an array
616             containing the collected output from the iterator. In a scalar context
617             returns a reference to the same array.
618              
619             For this to work properly the input iterator must return (index, value)
620             pairs. This allows the results to be placed in the correct slots in the
621             output array. The simplest way to do this is to pass an array reference
622             as the input iterator:
623              
624             my @output = iterate_as_array( \&some_handler, \@input );
625              
626             =cut
627              
628             sub iterate_as_array {
629 17     17 1 449915 my $iter = iterate( @_ );
630 17         42 my @out = ();
631 17         44 while ( my ( $index, $value ) = $iter->() ) {
632 50110         66842 $out[$index] = $value;
633             }
634 17 50       4819 return wantarray ? @out : \@out;
635             }
636              
637             =head2 C<< iterate_as_hash >>
638              
639             As C but instead of returning an iterator returns a hash
640             containing the collected output from the iterator. In a scalar context
641             returns a reference to the same hash.
642              
643             For this to work properly the input iterator must return (key, value)
644             pairs. This allows the results to be placed in the correct slots in the
645             output hash. The simplest way to do this is to pass a hash reference as
646             the input iterator:
647              
648             my %output = iterate_as_hash( \&some_handler, \%input );
649              
650             =cut
651              
652             sub iterate_as_hash {
653 1     1 1 1167 my $iter = iterate( @_ );
654 1         7 my %out = ();
655 1         11 while ( my ( $key, $value ) = $iter->() ) {
656 5         28 $out{$key} = $value;
657             }
658 1 50       98 return wantarray ? %out : \%out;
659             }
660              
661             sub _get_obj {
662 1014     1014   1243 my $fd = shift;
663 1014         2257 my $r = fd_retrieve $fd;
664 1014         63607 return $r->[0];
665             }
666              
667             sub _put_obj {
668 1015     1015   1494 my ( $obj, $fd ) = @_;
669 1015         4416 store_fd [$obj], $fd;
670 1015         99697 $fd->flush;
671             }
672              
673             1;
674              
675             __END__