File Coverage

blib/lib/Parallel/Iterator.pm
Criterion Covered Total %
statement 173 187 92.5
branch 65 82 79.2
condition 24 26 92.3
subroutine 29 29 100.0
pod 3 3 100.0
total 294 327 89.9


line stmt bran cond sub pod time code
1             # $Id: Iterator.pm 2746 2007-10-19 16:36:50Z andy $
2             package Parallel::Iterator;
3              
4 9     9   308797 use warnings;
  9         24  
  9         307  
5 9     9   51 use strict;
  9         20  
  9         415  
6 9     9   51 use Carp;
  9         21  
  9         800  
7 9     9   12174 use Storable qw( store_fd fd_retrieve dclone );
  9         42315  
  9         830  
8 9     9   11574 use IO::Handle;
  9         69931  
  9         467  
9 9     9   10493 use IO::Select;
  9         17291  
  9         436  
10 9     9   65 use Config;
  9         16  
  9         575  
11              
12             require 5.008;
13              
14             our $VERSION = '1.00';
15 9     9   60 use base qw( Exporter );
  9         18  
  9         1328  
16             our @EXPORT_OK = qw( iterate iterate_as_array iterate_as_hash );
17              
18 9     9   55 use constant IS_WIN32 => ( $^O =~ /^(MS)?Win32$/ );
  9         17  
  9         7665  
19              
20             my %DEFAULTS = (
21             workers => ( ( $Config{d_fork} && !IS_WIN32 ) ? 10 : 0 ),
22             onerror => 'die',
23             nowarn => 0,
24             batch => 1,
25             adaptive => 0,
26             );
27              
28             =head1 NAME
29              
30             Parallel::Iterator - Simple parallel execution
31              
32             =head1 VERSION
33              
34             This document describes Parallel::Iterator version 1.00
35              
36             =head1 SYNOPSIS
37              
38             use Parallel::Iterator qw( iterate );
39              
40             # A very expensive way to double 100 numbers...
41            
42             my @nums = ( 1 .. 100 );
43            
44             my $iter = iterate( sub {
45             my ( $id, $job ) = @_;
46             return $job * 2;
47             }, \@nums );
48            
49             my @out = ();
50             while ( my ( $index, $value ) = $iter->() ) {
51             $out[$index] = $value;
52             }
53            
54             =head1 DESCRIPTION
55              
56             The C function applies a user supplied transformation function to
57             each element in a list, returning a new list containing the
58             transformed elements.
59              
60             This module provides a 'parallel map'. Multiple worker processes are
61             forked so that many instances of the transformation function may be
62             executed simultaneously.
63              
64             For time consuming operations, particularly operations that spend most
65             of their time waiting for I/O, this is a big performance win. It also
66             provides a simple idiom to make effective use of multi CPU systems.
67              
68             There is, however, a considerable overhead associated with forking, so
69             the example in the synopsis (doubling a list of numbers) is I a
70             sensible use of this module.
71              
72             =head2 Example
73              
74             Imagine you have an array of URLs to fetch:
75              
76             my @urls = qw(
77             http://google.com/
78             http://hexten.net/
79             http://search.cpan.org/
80             ... and lots more ...
81             );
82              
83             Write a function that retrieves a URL and returns its contents or undef
84             if it can't be fetched:
85              
86             sub fetch {
87             my $url = shift;
88             my $resp = $ua->get($url);
89             return unless $resp->is_success;
90             return $resp->content;
91             };
92              
93             Now write a function to synthesize a special kind of iterator:
94              
95             sub list_iter {
96             my @ar = @_;
97             my $pos = 0;
98             return sub {
99             return if $pos >= @ar;
100             my @r = ( $pos, $ar[$pos] ); # Note: returns ( index, value )
101             $pos++;
102             return @r;
103             };
104             }
105              
106             The returned iterator will return each element of the array in turn and
107             then undef. Actually it returns both the index I the value of each
108             element in the array. Because multiple instances of the transformation
109             function execute in parallel the results won't necessarily come back in
110             order. The array index will later allow us to put completed items in the
111             correct place in an output array.
112              
113             Get an iterator for the list of URLs:
114              
115             my $url_iter = list_iter( @urls );
116              
117             Then wrap it in another iterator which will return the transformed results:
118              
119             my $page_iter = iterate( \&fetch, $url_iter );
120              
121             Finally loop over the returned iterator storing results:
122              
123             my @out = ( );
124             while ( my ( $index, $value ) = $page_iter->() ) {
125             $out[$index] = $value;
126             }
127              
128             Behind the scenes your program forked into ten (by default) instances of
129             itself and executed the page requests in parallel.
130              
131             =head2 Simpler interfaces
132              
133             Having to construct an iterator is a pain so C is smart enough
134             to do that for you. Instead of passing an iterator just pass a reference
135             to the array:
136              
137             my $page_iter = iterate( \&fetch, \@urls );
138              
139             If you pass a hash reference the iterator you get back will return key,
140             value pairs:
141              
142             my $some_iter = iterate( \&fetch, \%some_hash );
143              
144             If the returned iterator is inconvenient you can get back a hash or
145             array instead:
146              
147             my @done = iterate_as_array( \&fetch, @urls );
148              
149             my %done = iterate_as_hash( \&worker, %jobs );
150              
151             =head2 How It Works
152              
153             The current process is forked once for each worker. Each forked child is
154             connected to the parent by a pair of pipes. The child's STDIN, STDOUT
155             and STDERR are unaffected.
156              
157             Input values are serialised (using Storable) and passed to the workers.
158             Completed work items are serialised and returned.
159              
160             =head2 Caveats
161              
162             Parallel::Iterator is designed to be simple to use - but the underlying
163             forking of the main process can cause mystifying problems unless you
164             have an understanding of what is going on behind the scenes.
165              
166             =head3 Worker execution enviroment
167              
168             All code apart from the worker subroutine executes in the parent process
169             as normal. The worker executes in a forked instance of the parent
170             process. That means that things like this won't work as expected:
171              
172             my %tally = ();
173             my @r = iterate_as_array( sub {
174             my ($id, $name) = @_;
175             $tally{$name}++; # might not do what you think it does
176             return reverse $name;
177             }, @names );
178              
179             # Now print out the tally...
180             while ( my ( $name, $count ) = each %tally ) {
181             printf("%5d : %s\n", $count, $name);
182             }
183              
184             Because the worker is a closure it can see the C<%tally> hash from its
185             enclosing scope; but because it's running in a forked clone of the parent
186             process it modifies its own copy of C<%tally> rather than the copy for
187             the parent process.
188              
189             That means that after the job terminates the C<%tally> in the parent
190             process will be empty.
191              
192             In general you should avoid side effects in your worker subroutines.
193              
194             =head3 Serialization
195              
196             Values are serialised using L to pass to the worker subroutine
197             and results from the worker are again serialised before being passed
198             back. Be careful what your values refer to: everything has to be
199             serialised. If there's an indirect way to reach a large object graph
200             Storable will find it and performance will suffer.
201              
202             To find out how large your serialised values are serialise one of them
203             and check its size:
204              
205             use Storable qw( freeze );
206             my $serialized = freeze $some_obj;
207             print length($serialized), " bytes\n";
208              
209             In your tests you may wish to guard against the possibility of a change
210             to the structure of your values resulting in a sudden increase in
211             serialized size:
212              
213             ok length(freeze $some_obj) < 1000, "Object too bulky?";
214              
215             See the documetation for L for other caveats.
216              
217             =head3 Performance
218              
219             Process forking is expensive. Only use Parallel::Iterator in cases where:
220              
221             =over
222              
223             =item the worker waits for I/O
224              
225             The case of fetching web pages is a good example of this. Fetching a
226             page with LWP::UserAgent may take as long as a few seconds but probably
227             consumes only a few milliseconds of processor time. Running many
228             requests in parallel is a huge win - but be kind to the server you're
229             talking to: don't launch a lot of parallel requests unless it's your
230             server or you know it can handle the load.
231              
232             =item the worker is CPU intensive and you have multiple cores / CPUs
233              
234             If the worker is doing an expensive calculation you can parallelise that
235             across multiple CPU cores. Benchmark first though. There's a
236             considerable overhead associated with Parallel::Iterator; unless your
237             calculations are time consuming that overhead will dwarf whatever time
238             they take.
239              
240             =back
241              
242             =head1 INTERFACE
243              
244             =head2 C<< iterate( [ $options ], $worker, $iterator ) >>
245              
246             Get an iterator that applies the supplied transformation function to
247             each value returned by the input iterator.
248              
249             Instead of an iterator you may pass an array or hash reference and
250             C will convert it internally into a suitable iterator.
251              
252             If you are doing this you may wish to investigate C and
253             C.
254              
255             =head3 Options
256              
257             A reference to a hash of options may be supplied as the first argument.
258             The following options are supported:
259              
260             =over
261              
262             =item C
263              
264             The number of concurrent processes to launch. Set this to 0 to disable
265             forking. Defaults to 10 on systems that support fork and 0 (disable
266             forking) on those that do not.
267              
268             =item C
269              
270             Normally C will issue a warning and fall back to single process
271             mode on systems on which fork is not available. This option supresses
272             that warning.
273              
274             =item C
275              
276             Ordinarily items are passed to the worker one at a time. If you are
277             processing a large number of items it may be more efficient to process
278             them in batches. Specify the batch size using this option.
279              
280             Batching is transparent from the caller's perspective. Internally it
281             modifies the iterators and worker (by wrapping them in additional
282             closures) so that they pack, process and unpack chunks of work.
283              
284             =item C
285              
286             Extending the idea of batching a number of work items to amortize the
287             overhead of passing work to and from parallel workers you may also ask
288             C to heuristically determine the batch size by setting the
289             C option to a numeric value.
290              
291             The batch size will be computed as
292              
293             / /
294              
295             A larger value for C will reduce the rate at which the batch
296             size increases. Good values tend to be in the range 1 to 2.
297              
298             You can also specify lower and, optionally, upper bounds on the batch
299             size by passing an reference to an array containing ( lower bound,
300             growth ratio, upper bound ). The upper bound may be omitted.
301              
302             my $iter = iterate(
303             { adaptive => [ 5, 2, 100 ] },
304             $worker, \@stuff );
305              
306             =item C
307              
308             The action to take when an error is thrown in the iterator. Possible
309             values are 'die', 'warn' or a reference to a subroutine that will be
310             called with the index of the job that threw the exception and the value
311             of C<$@> thrown.
312              
313             iterate( {
314             onerror => sub {
315             my ($id, $err) = @_;
316             $self->log( "Error for index $id: $err" );
317             },
318             $worker,
319             \@jobs
320             );
321              
322             The default is 'die'.
323            
324             =back
325              
326             =cut
327              
328             sub _massage_iterator {
329 27     27   50 my $iter = shift;
330 27 100       146 if ( 'ARRAY' eq ref $iter ) {
    100          
    50          
331 21         3355 my @ar = @$iter;
332 21         41 my $pos = 0;
333             return sub {
334 50202 100   50202   101967 return if $pos >= @ar;
335 50126         103271 my @r = ( $pos, $ar[$pos] );
336 50126         55911 $pos++;
337 50126         217466 return @r;
338 21         168 };
339             }
340             elsif ( 'HASH' eq ref $iter ) {
341 2         12 my %h = %$iter;
342 2         12 my @k = keys %h;
343             return sub {
344 12 100   12   41 return unless @k;
345 10         211 my $k = shift @k;
346 10         183 return ( $k, $h{$k} );
347 2         14 };
348             }
349             elsif ( 'CODE' eq ref $iter ) {
350 4         12 return $iter;
351             }
352             else {
353 0         0 croak "Iterator must be a code, array or hash ref";
354             }
355             }
356              
357             sub _nonfork {
358 9     9   17 my ( $options, $worker, $iter ) = @_;
359              
360             return sub {
361 551     551   1050 while ( 1 ) {
362 551 100       1048 if ( my @next = $iter->() ) {
363 542         1459 my ( $id, $work ) = @next;
364             # dclone so that we have the same semantics as the
365             # forked version.
366 542 100 66     48704 $work = dclone $work if defined $work && ref $work;
367 542         1206 my $result = eval { $worker->( $id, $work ) };
  542         1218  
368 542 50       8443 if ( my $err = $@ ) {
369 0         0 $options->{onerror}->( $id, $err );
370             }
371             else {
372 542         2583 return ( $id, $result );
373             }
374             }
375             else {
376 9         37 return;
377             }
378             }
379 9         70 };
380             }
381              
382             # Does this sub look a bit long to you? :)
383             sub _fork {
384 18     18   77 my ( $options, $worker, $iter ) = @_;
385              
386 18         41 my @workers = ();
387 18         35 my @result_queue = ();
388 18         255 my $select = IO::Select->new;
389 18         282 my $rotate = 0;
390              
391             return sub {
392             LOOP: {
393             # Make new workers
394 946   100 946   2270 while ( @workers < $options->{workers} && ( my @next = $iter->() ) )
  1289         4700  
395             {
396              
397 76         8375 my ( $my_rdr, $my_wtr, $child_rdr, $child_wtr )
398             = map IO::Handle->new, 1 .. 4;
399              
400 76 50       17882 pipe $child_rdr, $my_wtr
401             or croak "Can't open write pipe ($!)\n";
402              
403 76 50       2026 pipe $my_rdr, $child_wtr
404             or croak "Can't open read pipe ($!)\n";
405              
406 76 50       119507 if ( my $pid = fork ) {
407             # Parent
408 76         10836 close $_ for $child_rdr, $child_wtr;
409 76         1183 push @workers, $pid;
410 76         5405 $select->add( [ $my_rdr, $my_wtr, 0 ] );
411 76         17387 _put_obj( \@next, $my_wtr );
412             }
413             else {
414             # Child
415 0         0 close $_ for $my_rdr, $my_wtr;
416              
417             # Don't execute any END blocks
418 9     9   7709 use POSIX '_exit';
  9         61221  
  9         73  
419 0         0 eval q{END { _exit 0 }};
420              
421             # Worker loop
422 0         0 while ( defined( my $job = _get_obj( $child_rdr ) ) ) {
423 0         0 my $result = eval { $worker->( @$job ) };
  0         0  
424 0         0 my $err = $@;
425 0 0       0 _put_obj(
426             [
427             $err
428             ? ( 'E', $job->[0], $err )
429             : ( 'R', $job->[0], $result )
430             ],
431             $child_wtr
432             );
433             }
434              
435             # End of stream
436 0         0 _put_obj( undef, $child_wtr );
437 0         0 close $_ for $child_rdr, $child_wtr;
438             # We use CORE::exit for MP compatibility
439 0         0 CORE::exit;
440             }
441             }
442              
443 1289 100       9060 return @{ shift @result_queue } if @result_queue;
  928         21745  
444 361 100       1472 if ( $select->count ) {
445 344         1966 eval {
446 344         1055 my @rdr = $select->can_read;
447             # Anybody got completed work?
448 344         100702 for my $r ( @rdr ) {
449 1014         2610 my ( $rh, $wh, $eof ) = @$r;
450 1014 100       2270 if ( defined( my $results = _get_obj( $rh ) ) ) {
451 939         2090 my $type = shift @$results;
452 939 100       3137 if ( $type eq 'R' ) {
    50          
453 928         1581 push @result_queue, $results;
454             }
455             elsif ( $type eq 'E' ) {
456 11         544 $options->{onerror}->( @$results );
457             }
458             else {
459 0         0 die "Bad result type: $type";
460             }
461              
462             # We operate a strict one in, one out policy
463             # - which avoids deadlocks. Having received
464             # the previous result send a new work value.
465 938 50       2267 unless ( $eof ) {
466 938 100       2174 if ( my @next = $iter->() ) {
467 863         5104 _put_obj( \@next, $wh );
468             }
469             else {
470 75         241 _put_obj( undef, $wh );
471 75         879 close $wh;
472 75         141 @{$r}[ 1, 2 ] = ( undef, 1 );
  75         4361  
473             }
474             }
475             }
476             else {
477 75         281 $select->remove( $r );
478 75         3560 close $rh;
479             }
480             }
481             };
482              
483 344 100       1291 if ( my $err = $@ ) {
484             # Finish all the workers
485 1         9 _put_obj( undef, $_->[1] ) for $select->handles;
486              
487             # And wait for them to exit
488 1         1851 waitpid( $_, 0 ) for @workers;
489              
490             # Rethrow
491 1         13 die $err;
492             }
493              
494 343         820 redo LOOP;
495             }
496 17         24508 waitpid( $_, 0 ) for @workers;
497 17         111 return;
498             }
499 18         273 };
500             }
501              
502             sub _batch_input_iter {
503 10     10   22 my ( $code, $options ) = @_;
504              
505 10 100       47 if ( my $adapt = $options->{adaptive} ) {
506 6   100     35 my $workers = $options->{workers} || 1;
507 6         15 my $count = 0;
508              
509 6 100       35 $adapt = [ 1, $adapt, undef ]
510             unless 'ARRAY' eq ref $adapt;
511              
512 6         20 my ( $min, $ratio, $max ) = @$adapt;
513 6 100 66     67 $min = 1 unless defined $min && $min > 1;
514              
515             return sub {
516 774     774   1581 my @chunk = ();
517              
518             # Adapt batch size
519 774         1840 my $batch = $count / $workers / $ratio;
520 774 100       3337 $batch = $min if $batch < $min;
521 774 100 100     3468 $batch = $max if defined $max && $batch > $max;
522              
523 774   100     3622 while ( @chunk < $batch && ( my @next = $code->() ) ) {
524 30000         50190 push @chunk, \@next;
525 30000         110599 $count++;
526             }
527              
528 774 100       4634 return @chunk ? ( 0, \@chunk ) : ();
529 6         116 };
530             }
531             else {
532 4         8 my $batch = $options->{batch};
533              
534             return sub {
535 226     226   479 my @chunk = ();
536 226   100     1194 while ( @chunk < $batch && ( my @next = $code->() ) ) {
537 20000         93333 push @chunk, \@next;
538             }
539 226 100       1111 return @chunk ? ( 0, \@chunk ) : ();
540 4         42 };
541             }
542             }
543              
544             sub _batch_output_iter {
545 10     10   82 my $code = shift;
546 10         22 my @queue = ();
547             return sub {
548 50010 100   50010   100263 unless ( @queue ) {
549 955 100       1681 if ( my ( undef, $chunk ) = $code->() ) {
550 945         8915 @queue = @$chunk;
551             }
552             else {
553 10         57 return;
554             }
555             }
556 50000         70188 return @{ shift @queue };
  50000         135442  
557 10         63 };
558 0         0 return $code;
559             }
560              
561             sub _batch_worker {
562 10     10   62 my $code = shift;
563             return sub {
564 388     388   808 my ( undef, $chunk ) = @_;
565 388         739 for my $item ( @$chunk ) {
566 25000         97301 $item->[1] = $code->( @$item );
567             }
568 388         2084 return $chunk;
569 10         91 };
570             }
571              
572             sub iterate {
573 27 50   27 1 18683 my %options = ( %DEFAULTS, %{ 'HASH' eq ref $_[0] ? shift : {} } );
  27         354  
574              
575 27 50       135 croak "iterate takes 2 or 3 args" unless @_ == 2;
576              
577 27         119 my @bad_opt = grep { !exists $DEFAULTS{$_} } keys %options;
  135         280  
578 27 50       120 croak "Unknown option(s): ", join( ', ', sort @bad_opt ), "\n"
579             if @bad_opt;
580              
581 27         63 my $worker = shift;
582 27 50       123 croak "Worker must be a coderef"
583             unless 'CODE' eq ref $worker;
584              
585 27         105 my $iter = _massage_iterator( shift );
586              
587 27 100       402 if ( $options{onerror} =~ /^(die|warn)$/ ) {
588 26         3974 $options{onerror} = eval "sub { shift; $1 shift }";
589             }
590              
591 27 50       141 croak "onerror option must be 'die', 'warn' or a code reference"
592             unless 'CODE' eq ref $options{onerror};
593              
594 27 100 100     251 if ( $options{workers} > 0 && $DEFAULTS{workers} == 0 ) {
595 3 50       10 warn "Fork not available; falling back to single process mode\n"
596             unless $options{nowarn};
597 3         6 $options{workers} = 0;
598             }
599              
600 27 100       112 my $factory = $options{workers} == 0 ? \&_nonfork : \&_fork;
601              
602 27 100 100     283 if ( $options{batch} > 1 || $options{adaptive} ) {
603 10         48 return _batch_output_iter(
604             $factory->(
605             \%options,
606             _batch_worker( $worker ),
607             _batch_input_iter( $iter, \%options )
608             )
609             );
610             }
611             else {
612             # OK. Ready. Let's do it.
613 17         142 return $factory->( \%options, $worker, $iter );
614             }
615             }
616              
617             =head2 C<< iterate_as_array >>
618              
619             As C but instead of returning an iterator returns an array
620             containing the collected output from the iterator. In a scalar context
621             returns a reference to the same array.
622              
623             For this to work properly the input iterator must return (index, value)
624             pairs. This allows the results to be placed in the correct slots in the
625             output array. The simplest way to do this is to pass an array reference
626             as the input iterator:
627              
628             my @output = iterate_as_array( \&some_handler, \@input );
629              
630             =cut
631              
632             sub iterate_as_array {
633 17     17 1 629948 my $iter = iterate( @_ );
634 17         42 my @out = ();
635 17         55 while ( my ( $index, $value ) = $iter->() ) {
636 50110         148570 $out[$index] = $value;
637             }
638 17 50       14896 return wantarray ? @out : \@out;
639             }
640              
641             =head2 C<< iterate_as_hash >>
642              
643             As C but instead of returning an iterator returns a hash
644             containing the collected output from the iterator. In a scalar context
645             returns a reference to the same hash.
646              
647             For this to work properly the input iterator must return (key, value)
648             pairs. This allows the results to be placed in the correct slots in the
649             output hash. The simplest way to do this is to pass a hash reference as
650             the input iterator:
651              
652             my %output = iterate_as_hash( \&some_handler, \%input );
653              
654             =cut
655              
656             sub iterate_as_hash {
657 1     1 1 1703 my $iter = iterate( @_ );
658 1         4 my %out = ();
659 1         9 while ( my ( $key, $value ) = $iter->() ) {
660 5         19 $out{$key} = $value;
661             }
662 1 50       433 return wantarray ? %out : \%out;
663             }
664              
665             sub _get_obj {
666 1014     1014   1722 my $fd = shift;
667 1014         3705 my $r = fd_retrieve $fd;
668 1014         145868 return $r->[0];
669             }
670              
671             sub _put_obj {
672 1015     1015   2073 my ( $obj, $fd ) = @_;
673 1015         5698 store_fd [$obj], $fd;
674 1015         286047 $fd->flush;
675             }
676              
677             1;
678             __END__