File Coverage

blib/lib/Parallel/Map/Segmented.pm
Criterion Covered Total %
statement 26 30 86.6
branch 5 8 62.5
condition 2 3 66.6
subroutine 7 8 87.5
pod 1 1 100.0
total 41 50 82.0


line stmt bran cond sub pod time code
1             package Parallel::Map::Segmented;
2             $Parallel::Map::Segmented::VERSION = '0.2.2';
3 5     5   344365 use strict;
  5         55  
  5         158  
4 5     5   27 use warnings;
  5         9  
  5         125  
5 5     5   132 use 5.014;
  5         16  
6              
7 5     5   2334 use parent 'Parallel::ForkManager::Segmented::Base';
  5         1472  
  5         27  
8              
9 5     5   7438 use Parallel::Map qw/ pmap_void /;
  5         525069  
  5         1397  
10              
11             sub run
12             {
13 5     5 1 59015 my ( $self, $args ) = @_;
14              
15 5         39 my $processed = $self->process_args($args);
16 5 100       184 return if not $processed;
17             my ( $WITH_PM, $batch_cb, $batch_size, $nproc, $stream_cb, ) =
18 4         16 @{$processed}{qw/ WITH_PM batch_cb batch_size nproc stream_cb /};
  4         18  
19              
20 4         21 my $batch = $stream_cb->( { size => 1 } )->{items};
21 4 50       63 return if not defined $batch;
22 4         16 $batch_cb->($batch);
23 4 50       6665 if ($WITH_PM)
24             {
25 0     0   0 pmap_void sub { $batch_cb->(shift); Future->done; }, generate => sub {
  0         0  
26 28   66 28   160024 return $stream_cb->( { size => $batch_size } )->{items} // ();
27             },
28 4 50       76 ( $nproc ? ( forks => $nproc ) : () ),
29             ;
30             }
31             else
32             {
33 0         0 while (
34             defined(
35             $batch = $stream_cb->( { size => $batch_size } )->{items}
36             )
37             )
38             {
39 0         0 $batch_cb->($batch);
40             }
41             }
42 4         29965 return;
43             }
44              
45             1;
46              
47             __END__