File Coverage

blib/lib/Parallel/Iterator.pm
Criterion Covered Total %
statement 175 189 92.5
branch 65 82 79.2
condition 24 26 92.3
subroutine 30 30 100.0
pod 3 3 100.0
total 297 330 90.0


line stmt bran cond sub pod time code
1 9     9   470549 use 5.008; use warnings; use strict;
  9     9   69  
  9     9   46  
  9         15  
  9         227  
  9         38  
  9         16  
  9         316  
2              
3             package Parallel::Iterator;
4              
5 9     9   53 use Carp;
  9         21  
  9         559  
6 9     9   5020 use Storable qw( store_fd fd_retrieve dclone );
  9         26190  
  9         638  
7 9     9   4583 use IO::Handle;
  9         49485  
  9         433  
8 9     9   4612 use IO::Select;
  9         12853  
  9         446  
9 9     9   67 use Config;
  9         20  
  9         409  
10              
11             our $VERSION = '1.002';
12              
13 9     9   46 use Exporter (); *import = \&Exporter::import;
  9         18  
  9         627  
14             our @EXPORT_OK = qw( iterate iterate_as_array iterate_as_hash );
15              
16 9     9   58 use constant IS_WIN32 => ( $^O =~ /^(MS)?Win32$/ );
  9         17  
  9         6059  
17              
18             my %DEFAULTS = (
19             workers => ( ( $Config{d_fork} && !IS_WIN32 ) ? 10 : 0 ),
20             onerror => 'die',
21             nowarn => 0,
22             batch => 1,
23             adaptive => 0,
24             );
25              
26             =head1 NAME
27              
28             Parallel::Iterator - Simple parallel execution
29              
30             =head1 SYNOPSIS
31              
32             use Parallel::Iterator qw( iterate );
33              
34             # A very expensive way to double 100 numbers...
35            
36             my @nums = ( 1 .. 100 );
37            
38             my $iter = iterate( sub {
39             my ( $id, $job ) = @_;
40             return $job * 2;
41             }, \@nums );
42            
43             my @out = ();
44             while ( my ( $index, $value ) = $iter->() ) {
45             $out[$index] = $value;
46             }
47              
48             The C function applies a user supplied transformation function to
49             each element in a list, returning a new list containing the
50             transformed elements.
51              
52             =head1 DESCRIPTION
53              
54             This module provides a 'parallel map'. Multiple worker processes are
55             forked so that many instances of the transformation function may be
56             executed simultaneously.
57              
58             For time consuming operations, particularly operations that spend most
59             of their time waiting for I/O, this is a big performance win. It also
60             provides a simple idiom to make effective use of multi CPU systems.
61              
62             There is, however, a considerable overhead associated with forking, so
63             the example in the synopsis (doubling a list of numbers) is I a
64             sensible use of this module.
65              
66             =head1 MANUAL
67              
68             =head2 Basic Usage
69              
70             Imagine you have an array of URLs to fetch:
71              
72             my @urls = qw(
73             http://google.com/
74             http://hexten.net/
75             http://search.cpan.org/
76             ... and lots more ...
77             );
78              
79             Write a function that retrieves a URL and returns its contents or undef
80             if it can't be fetched:
81              
82             sub fetch {
83             my ($id, $url) = @_;
84             my $resp = $ua->get($url);
85             return unless $resp->is_success;
86             return $resp->content;
87             };
88              
89             Now write a function to synthesize a special kind of iterator:
90              
91             sub list_iter {
92             my @ar = @_;
93             my $pos = 0;
94             return sub {
95             return if $pos >= @ar;
96             my @r = ( $pos, $ar[$pos] ); # Note: returns ( index, value )
97             $pos++;
98             return @r;
99             };
100             }
101              
102             The returned iterator will return each element of the array in turn and
103             then undef. Actually it returns both the index I the value of each
104             element in the array. Because multiple instances of the transformation
105             function execute in parallel the results won't necessarily come back in
106             order. The array index will later allow us to put completed items in the
107             correct place in an output array.
108              
109             Get an iterator for the list of URLs:
110              
111             my $url_iter = list_iter( @urls );
112              
113             Then wrap it in another iterator which will return the transformed results:
114              
115             my $page_iter = iterate( \&fetch, $url_iter );
116              
117             Finally loop over the returned iterator storing results:
118              
119             my @out = ( );
120             while ( my ( $index, $value ) = $page_iter->() ) {
121             $out[$index] = $value;
122             }
123              
124             Behind the scenes your program forked into ten (by default) instances of
125             itself and executed the page requests in parallel.
126              
127             =head2 Simpler interfaces
128              
129             Having to construct an iterator is a pain so C is smart enough
130             to do that for you. Instead of passing an iterator just pass a reference
131             to the array:
132              
133             my $page_iter = iterate( \&fetch, \@urls );
134              
135             If you pass a hash reference the iterator you get back will return key,
136             value pairs:
137              
138             my $some_iter = iterate( \&fetch, \%some_hash );
139              
140             If the returned iterator is inconvenient you can get back a hash or
141             array instead:
142              
143             my @done = iterate_as_array( \&fetch, \@urls );
144              
145             my %done = iterate_as_hash( \&worker, \%jobs );
146              
147             =head2 How It Works
148              
149             The current process is forked once for each worker. Each forked child is
150             connected to the parent by a pair of pipes. The child's STDIN, STDOUT
151             and STDERR are unaffected.
152              
153             Input values are serialised (using Storable) and passed to the workers.
154             Completed work items are serialised and returned.
155              
156             =head2 Caveats
157              
158             Parallel::Iterator is designed to be simple to use - but the underlying
159             forking of the main process can cause mystifying problems unless you
160             have an understanding of what is going on behind the scenes.
161              
162             =head3 Worker execution enviroment
163              
164             All code apart from the worker subroutine executes in the parent process
165             as normal. The worker executes in a forked instance of the parent
166             process. That means that things like this won't work as expected:
167              
168             my %tally = ();
169             my @r = iterate_as_array( sub {
170             my ($id, $name) = @_;
171             $tally{$name}++; # might not do what you think it does
172             return reverse $name;
173             }, \@names );
174              
175             # Now print out the tally...
176             while ( my ( $name, $count ) = each %tally ) {
177             printf("%5d : %s\n", $count, $name);
178             }
179              
180             Because the worker is a closure it can see the C<%tally> hash from its
181             enclosing scope; but because it's running in a forked clone of the parent
182             process it modifies its own copy of C<%tally> rather than the copy for
183             the parent process.
184              
185             That means that after the job terminates the C<%tally> in the parent
186             process will be empty.
187              
188             In general you should avoid side effects in your worker subroutines.
189              
190             =head3 Serialization
191              
192             Values are serialised using L to pass to the worker subroutine
193             and results from the worker are again serialised before being passed
194             back. Be careful what your values refer to: everything has to be
195             serialised. If there's an indirect way to reach a large object graph
196             Storable will find it and performance will suffer.
197              
198             To find out how large your serialised values are serialise one of them
199             and check its size:
200              
201             use Storable qw( freeze );
202             my $serialized = freeze $some_obj;
203             print length($serialized), " bytes\n";
204              
205             In your tests you may wish to guard against the possibility of a change
206             to the structure of your values resulting in a sudden increase in
207             serialized size:
208              
209             ok length(freeze $some_obj) < 1000, "Object too bulky?";
210              
211             See the documetation for L for other caveats.
212              
213             =head3 Performance
214              
215             Process forking is expensive. Only use Parallel::Iterator in cases where:
216              
217             =over
218              
219             =item the worker waits for I/O
220              
221             The case of fetching web pages is a good example of this. Fetching a
222             page with LWP::UserAgent may take as long as a few seconds but probably
223             consumes only a few milliseconds of processor time. Running many
224             requests in parallel is a huge win - but be kind to the server you're
225             talking to: don't launch a lot of parallel requests unless it's your
226             server or you know it can handle the load.
227              
228             =item the worker is CPU intensive and you have multiple cores / CPUs
229              
230             If the worker is doing an expensive calculation you can parallelise that
231             across multiple CPU cores. Benchmark first though. There's a
232             considerable overhead associated with Parallel::Iterator; unless your
233             calculations are time consuming that overhead will dwarf whatever time
234             they take.
235              
236             =back
237              
238             =head1 INTERFACE
239              
240             =head2 C<< iterate( [ $options ], $worker, $iterator ) >>
241              
242             Get an iterator that applies the supplied transformation function to
243             each value returned by the input iterator.
244              
245             Instead of an iterator you may pass an array or hash reference and
246             C will convert it internally into a suitable iterator.
247              
248             If you are doing this you may wish to investigate C and
249             C.
250              
251             =head3 Options
252              
253             A reference to a hash of options may be supplied as the first argument.
254             The following options are supported:
255              
256             =over
257              
258             =item C
259              
260             The number of concurrent processes to launch. Set this to 0 to disable
261             forking. Defaults to 10 on systems that support fork and 0 (disable
262             forking) on those that do not.
263              
264             =item C
265              
266             Normally C will issue a warning and fall back to single process
267             mode on systems on which fork is not available. This option supresses
268             that warning.
269              
270             =item C
271              
272             Ordinarily items are passed to the worker one at a time. If you are
273             processing a large number of items it may be more efficient to process
274             them in batches. Specify the batch size using this option.
275              
276             Batching is transparent from the caller's perspective. Internally it
277             modifies the iterators and worker (by wrapping them in additional
278             closures) so that they pack, process and unpack chunks of work.
279              
280             =item C
281              
282             Extending the idea of batching a number of work items to amortize the
283             overhead of passing work to and from parallel workers you may also ask
284             C to heuristically determine the batch size by setting the
285             C option to a numeric value.
286              
287             The batch size will be computed as
288              
289             / /
290              
291             A larger value for C will reduce the rate at which the batch
292             size increases. Good values tend to be in the range 1 to 2.
293              
294             You can also specify lower and, optionally, upper bounds on the batch
295             size by passing an reference to an array containing ( lower bound,
296             growth ratio, upper bound ). The upper bound may be omitted.
297              
298             my $iter = iterate(
299             { adaptive => [ 5, 2, 100 ] },
300             $worker, \@stuff );
301              
302             =item C
303              
304             The action to take when an error is thrown in the iterator. Possible
305             values are 'die', 'warn' or a reference to a subroutine that will be
306             called with the index of the job that threw the exception and the value
307             of C<$@> thrown.
308              
309             iterate( {
310             onerror => sub {
311             my ($id, $err) = @_;
312             $self->log( "Error for index $id: $err" );
313             },
314             $worker,
315             \@jobs
316             );
317              
318             The default is 'die'.
319            
320             =back
321              
322             =cut
323              
324             sub _massage_iterator {
325 27     27   59 my $iter = shift;
326 27 100       154 if ( 'ARRAY' eq ref $iter ) {
    100          
    50          
327 21         2237 my @ar = @$iter;
328 21         51 my $pos = 0;
329             return sub {
330 50202 100   50202   72638 return if $pos >= @ar;
331 50126         67732 my @r = ( $pos, $ar[$pos] );
332 50126         49327 $pos++;
333 50126         108877 return @r;
334 21         169 };
335             }
336             elsif ( 'HASH' eq ref $iter ) {
337 2         11 my %h = %$iter;
338 2         13 my @k = keys %h;
339             return sub {
340 12 100   12   32 return unless @k;
341 10         44 my $k = shift @k;
342 10         54 return ( $k, $h{$k} );
343 2         14 };
344             }
345             elsif ( 'CODE' eq ref $iter ) {
346 4         13 return $iter;
347             }
348             else {
349 0         0 croak "Iterator must be a code, array or hash ref";
350             }
351             }
352              
353             sub _nonfork {
354 9     9   25 my ( $options, $worker, $iter ) = @_;
355              
356             return sub {
357 551     551   730 while ( 1 ) {
358 551 100       827 if ( my @next = $iter->() ) {
359 542         1060 my ( $id, $work ) = @next;
360             # dclone so that we have the same semantics as the
361             # forked version.
362 542 100 66     25802 $work = dclone $work if defined $work && ref $work;
363 542         1161 my $result = eval { $worker->( $id, $work ) };
  542         1157  
364 542 50       8251 if ( my $err = $@ ) {
365 0         0 $options->{onerror}->( $id, $err );
366             }
367             else {
368 542         1480 return ( $id, $result );
369             }
370             }
371             else {
372 9         33 return;
373             }
374             }
375 9         53 };
376             }
377              
378             # Does this sub look a bit long to you? :)
379             sub _fork {
380 18     18   48 my ( $options, $worker, $iter ) = @_;
381              
382 18         35 my @workers = ();
383 18         30 my @result_queue = ();
384 18         265 my $select = IO::Select->new;
385 18         278 my $rotate = 0;
386              
387             return sub {
388             LOOP: {
389             # Make new workers
390 946   100 946   1726 while ( @workers < $options->{workers} && ( my @next = $iter->() ) )
  1282         2911  
391             {
392              
393 76         3409 my ( $my_rdr, $my_wtr, $child_rdr, $child_wtr )
394             = map IO::Handle->new, 1 .. 4;
395              
396 76 50       12746 pipe $child_rdr, $my_wtr
397             or croak "Can't open write pipe ($!)\n";
398              
399 76 50       2438 pipe $my_rdr, $child_wtr
400             or croak "Can't open read pipe ($!)\n";
401              
402 76 50       79461 if ( my $pid = fork ) {
403             # Parent
404 76         5633 close $_ for $child_rdr, $child_wtr;
405 76         1278 push @workers, $pid;
406 76         4511 $select->add( [ $my_rdr, $my_wtr, 0 ] );
407 76         12243 _put_obj( \@next, $my_wtr );
408             }
409             else {
410             # Child
411 0         0 close $_ for $my_rdr, $my_wtr;
412              
413             # Don't execute any END blocks
414 9     9   4428 use POSIX '_exit';
  9         51292  
  9         50  
415 0         0 eval q{END { _exit 0 }};
416              
417             # Worker loop
418 0         0 while ( defined( my $job = _get_obj( $child_rdr ) ) ) {
419 0         0 my $result = eval { $worker->( @$job ) };
  0         0  
420 0         0 my $err = $@;
421 0 0       0 _put_obj(
422             [
423             $err
424             ? ( 'E', $job->[0], $err )
425             : ( 'R', $job->[0], $result )
426             ],
427             $child_wtr
428             );
429             }
430              
431             # End of stream
432 0         0 _put_obj( undef, $child_wtr );
433 0         0 close $_ for $child_rdr, $child_wtr;
434             # We use CORE::exit for MP compatibility
435 0         0 CORE::exit;
436             }
437             }
438              
439 1282 100       2197 return @{ shift @result_queue } if @result_queue;
  928         2515  
440 354 100       1414 if ( $select->count ) {
441 337         1721 eval {
442 337         845 my @rdr = $select->can_read;
443             # Anybody got completed work?
444 337         31946 for my $r ( @rdr ) {
445 1014         2852 my ( $rh, $wh, $eof ) = @$r;
446 1014 100       1801 if ( defined( my $results = _get_obj( $rh ) ) ) {
447 939         1497 my $type = shift @$results;
448 939 100       1898 if ( $type eq 'R' ) {
    50          
449 928         1585 push @result_queue, $results;
450             }
451             elsif ( $type eq 'E' ) {
452 11         338 $options->{onerror}->( @$results );
453             }
454             else {
455 0         0 die "Bad result type: $type";
456             }
457              
458             # We operate a strict one in, one out policy
459             # - which avoids deadlocks. Having received
460             # the previous result send a new work value.
461 938 50       1705 unless ( $eof ) {
462 938 100       1786 if ( my @next = $iter->() ) {
463 863         2906 _put_obj( \@next, $wh );
464             }
465             else {
466 75         235 _put_obj( undef, $wh );
467 75         748 close $wh;
468 75         162 @{$r}[ 1, 2 ] = ( undef, 1 );
  75         914  
469             }
470             }
471             }
472             else {
473 75         259 $select->remove( $r );
474 75         3609 close $rh;
475             }
476             }
477             };
478              
479 337 100       1285 if ( my $err = $@ ) {
480             # Finish all the workers
481 1         12 _put_obj( undef, $_->[1] ) for $select->handles;
482              
483             # And wait for them to exit
484 1         1446 waitpid( $_, 0 ) for @workers;
485              
486             # Rethrow
487 1         12 die $err;
488             }
489              
490 336         746 redo LOOP;
491             }
492 17         16786 waitpid( $_, 0 ) for @workers;
493 17         190 return;
494             }
495 18         215 };
496             }
497              
498             sub _batch_input_iter {
499 10     10   40 my ( $code, $options ) = @_;
500              
501 10 100       30 if ( my $adapt = $options->{adaptive} ) {
502 6   100     55 my $workers = $options->{workers} || 1;
503 6         11 my $count = 0;
504              
505 6 100       32 $adapt = [ 1, $adapt, undef ]
506             unless 'ARRAY' eq ref $adapt;
507              
508 6         27 my ( $min, $ratio, $max ) = @$adapt;
509 6 100 66     49 $min = 1 unless defined $min && $min > 1;
510              
511             return sub {
512 774     774   1361 my @chunk = ();
513              
514             # Adapt batch size
515 774         2042 my $batch = $count / $workers / $ratio;
516 774 100       1697 $batch = $min if $batch < $min;
517 774 100 100     2143 $batch = $max if defined $max && $batch > $max;
518              
519 774   100     2567 while ( @chunk < $batch && ( my @next = $code->() ) ) {
520 30000         36088 push @chunk, \@next;
521 30000         53941 $count++;
522             }
523              
524 774 100       2384 return @chunk ? ( 0, \@chunk ) : ();
525 6         75 };
526             }
527             else {
528 4         9 my $batch = $options->{batch};
529              
530             return sub {
531 226     226   414 my @chunk = ();
532 226   100     1001 while ( @chunk < $batch && ( my @next = $code->() ) ) {
533 20000         37724 push @chunk, \@next;
534             }
535 226 100       723 return @chunk ? ( 0, \@chunk ) : ();
536 4         36 };
537             }
538             }
539              
540             sub _batch_output_iter {
541 10     10   22 my $code = shift;
542 10         19 my @queue = ();
543             return sub {
544 50010 100   50010   65810 unless ( @queue ) {
545 955 100       1300 if ( my ( undef, $chunk ) = $code->() ) {
546 945         4485 @queue = @$chunk;
547             }
548             else {
549 10         50 return;
550             }
551             }
552 50000         53506 return @{ shift @queue };
  50000         89345  
553 10         77 };
554 0         0 return $code;
555             }
556              
557             sub _batch_worker {
558 10     10   20 my $code = shift;
559             return sub {
560 388     388   612 my ( undef, $chunk ) = @_;
561 388         614 for my $item ( @$chunk ) {
562 25000         75595 $item->[1] = $code->( @$item );
563             }
564 388         1354 return $chunk;
565 10         87 };
566             }
567              
568             sub iterate {
569 27 50   27 1 12050 my %options = ( %DEFAULTS, %{ 'HASH' eq ref $_[0] ? shift : {} } );
  27         371  
570              
571 27 50       140 croak "iterate takes 2 or 3 args" unless @_ == 2;
572              
573 27         103 my @bad_opt = grep { !exists $DEFAULTS{$_} } keys %options;
  135         260  
574 27 50       88 croak "Unknown option(s): ", join( ', ', sort @bad_opt ), "\n"
575             if @bad_opt;
576              
577 27         59 my $worker = shift;
578 27 50       100 croak "Worker must be a coderef"
579             unless 'CODE' eq ref $worker;
580              
581 27         113 my $iter = _massage_iterator( shift );
582              
583 27 100       362 if ( $options{onerror} =~ /^(die|warn)$/ ) {
584 26         3192 $options{onerror} = eval "sub { shift; $1 shift }";
585             }
586              
587             croak "onerror option must be 'die', 'warn' or a code reference"
588 27 50       129 unless 'CODE' eq ref $options{onerror};
589              
590 27 100 100     236 if ( $options{workers} > 0 && $DEFAULTS{workers} == 0 ) {
591             warn "Fork not available; falling back to single process mode\n"
592 3 50       9 unless $options{nowarn};
593 3         6 $options{workers} = 0;
594             }
595              
596 27 100       113 my $factory = $options{workers} == 0 ? \&_nonfork : \&_fork;
597              
598 27 100 100     178 if ( $options{batch} > 1 || $options{adaptive} ) {
599 10         69 return _batch_output_iter(
600             $factory->(
601             \%options,
602             _batch_worker( $worker ),
603             _batch_input_iter( $iter, \%options )
604             )
605             );
606             }
607             else {
608             # OK. Ready. Let's do it.
609 17         64 return $factory->( \%options, $worker, $iter );
610             }
611             }
612              
613             =head2 C<< iterate_as_array >>
614              
615             As C but instead of returning an iterator returns an array
616             containing the collected output from the iterator. In a scalar context
617             returns a reference to the same array.
618              
619             For this to work properly the input iterator must return (index, value)
620             pairs. This allows the results to be placed in the correct slots in the
621             output array. The simplest way to do this is to pass an array reference
622             as the input iterator:
623              
624             my @output = iterate_as_array( \&some_handler, \@input );
625              
626             =cut
627              
628             sub iterate_as_array {
629 17     17 1 464209 my $iter = iterate( @_ );
630 17         47 my @out = ();
631 17         57 while ( my ( $index, $value ) = $iter->() ) {
632 50110         68555 $out[$index] = $value;
633             }
634 17 50       6360 return wantarray ? @out : \@out;
635             }
636              
637             =head2 C<< iterate_as_hash >>
638              
639             As C but instead of returning an iterator returns a hash
640             containing the collected output from the iterator. In a scalar context
641             returns a reference to the same hash.
642              
643             For this to work properly the input iterator must return (key, value)
644             pairs. This allows the results to be placed in the correct slots in the
645             output hash. The simplest way to do this is to pass a hash reference as
646             the input iterator:
647              
648             my %output = iterate_as_hash( \&some_handler, \%input );
649              
650             =cut
651              
652             sub iterate_as_hash {
653 1     1 1 1141 my $iter = iterate( @_ );
654 1         14 my %out = ();
655 1         3 while ( my ( $key, $value ) = $iter->() ) {
656 5         24 $out{$key} = $value;
657             }
658 1 50       93 return wantarray ? %out : \%out;
659             }
660              
661             sub _get_obj {
662 1014     1014   1368 my $fd = shift;
663 1014         2523 my $r = fd_retrieve $fd;
664 1014         65632 return $r->[0];
665             }
666              
667             sub _put_obj {
668 1015     1015   1704 my ( $obj, $fd ) = @_;
669 1015         4858 store_fd [$obj], $fd;
670 1015         108754 $fd->flush;
671             }
672              
673             1;
674              
675             __END__