File Coverage

blib/lib/Parallel/ForkManager/Segmented/Base.pm
Criterion Covered Total %
statement 41 48 85.4
branch 6 10 60.0
condition 3 8 37.5
subroutine 9 10 90.0
pod 3 3 100.0
total 62 79 78.4


line stmt bran cond sub pod time code
1             package Parallel::ForkManager::Segmented::Base;
2             $Parallel::ForkManager::Segmented::Base::VERSION = '0.4.0';
3 1     1   86062 use strict;
  1         12  
  1         30  
4 1     1   6 use warnings;
  1         2  
  1         54  
5 1     1   501 use autodie;
  1         15382  
  1         5  
6 1     1   7172 use 5.014;
  1         18  
7              
8             sub new
9             {
10 1     1 1 20117 my $class = shift;
11              
12 1         4 my $self = bless {}, $class;
13              
14 1         8 $self->_init(@_);
15              
16 1         12 return $self;
17             }
18              
19             sub _init
20             {
21 1     1   5 my ( $self, $args ) = @_;
22              
23 1         2 return;
24             }
25              
26             sub process_args
27             {
28 1     1 1 12 my ( $self, $args ) = @_;
29              
30 1         4 my $WITH_PM = !$args->{disable_fork};
31 1         2 my $items = $args->{items};
32 1         4 my $stream_cb = $args->{stream_cb};
33 1         2 my $cb = $args->{process_item};
34 1         3 my $batch_cb = $args->{process_batch};
35              
36 1 50 33     5 if ( $stream_cb && $items )
37             {
38 0         0 die "Do not specify both stream_cb and items!";
39             }
40 1 50 33     9 if ( $batch_cb && $cb )
41             {
42 0         0 die "Do not specify both process_item and process_batch!";
43             }
44             $batch_cb //= sub {
45 0     0   0 foreach my $item ( @{ shift() } )
  0         0  
46             {
47 0         0 $cb->($item);
48             }
49 0         0 return;
50 1   50     4 };
51 1         4 my $nproc = $args->{nproc};
52 1         3 my $batch_size = $args->{batch_size};
53              
54             # Return prematurely on empty input to avoid calling $ch with undef()
55             # at least once.
56 1 50       5 if ($items)
57             {
58 1 50       6 if ( not @$items )
59             {
60 0         0 return;
61             }
62             $stream_cb = sub {
63 5     5   13784 my ($args) = @_;
64 5         13 my $size = $args->{size};
65              
66 5 100       37 return +{ items =>
67             scalar( @$items ? [ splice @$items, 0, $size ] : undef() ),
68             };
69 1         6 };
70             }
71             return +{
72 1         10 WITH_PM => $WITH_PM,
73             batch_cb => $batch_cb,
74             batch_size => $batch_size,
75             nproc => $nproc,
76             stream_cb => $stream_cb,
77             };
78             }
79              
80             sub serial_run
81             {
82 1     1 1 3 my ( $self, $processed ) = @_;
83             my ( $WITH_PM, $batch_cb, $batch_size, $nproc, $stream_cb, ) =
84 1         15 @{$processed}{qw/ WITH_PM batch_cb batch_size nproc stream_cb /};
  1         9  
85              
86 1         5 while (
87             defined( my $batch = $stream_cb->( { size => $batch_size } )->{items} )
88             )
89             {
90 4         14 $batch_cb->($batch);
91             }
92 1         5 return;
93             }
94              
95             1;
96              
97             __END__