File Coverage

blib/lib/Parallel/Map/Segmented.pm
Criterion Covered Total %
statement 32 35 91.4
branch 5 8 62.5
condition 2 3 66.6
subroutine 9 10 90.0
pod 1 1 100.0
total 49 57 85.9


line stmt bran cond sub pod time code
1             package Parallel::Map::Segmented;
2             $Parallel::Map::Segmented::VERSION = '0.4.0';
3 5     5   347689 use strict;
  5         53  
  5         153  
4 5     5   25 use warnings;
  5         9  
  5         131  
5 5     5   2437 use autodie;
  5         80111  
  5         23  
6 5     5   35014 use 5.014;
  5         35  
7              
8 5     5   2999 use Parallel::ForkManager::Segmented::Base v0.4.0;
  5         35644  
  5         170  
9 5     5   36 use parent 'Parallel::ForkManager::Segmented::Base';
  5         12  
  5         33  
10              
11 5     5   2608 use Parallel::Map qw/ pmap_void /;
  5         515960  
  5         1502  
12              
13             sub run
14             {
15 5     5 1 58589 my ( $self, $args ) = @_;
16              
17 5         49 my $processed = $self->process_args($args);
18 5 100       207 return if not $processed;
19             my ( $WITH_PM, $batch_cb, $batch_size, $nproc, $stream_cb, ) =
20 4         14 @{$processed}{qw/ WITH_PM batch_cb batch_size nproc stream_cb /};
  4         20  
21              
22 4         19 my $batch = $stream_cb->( { size => 1 } )->{items};
23 4 50       62 return if not defined $batch;
24 4         16 $batch_cb->($batch);
25 4 50       6108 if ($WITH_PM)
26             {
27 0     0   0 pmap_void sub { $batch_cb->(shift); Future->done; }, generate => sub {
  0         0  
28 28   66 28   157989 return $stream_cb->( { size => $batch_size } )->{items} // ();
29             },
30 4 50       59 ( $nproc ? ( forks => $nproc ) : () ),
31             ;
32             }
33             else
34             {
35 0         0 $self->serial_run($processed);
36             }
37 4         28686 return;
38             }
39              
40             1;
41              
42             __END__