File Coverage

blib/lib/Parallel/Pipes/App.pm
Criterion Covered Total %
statement 43 46 93.4
branch 15 24 62.5
condition n/a
subroutine 5 5 100.0
pod 1 1 100.0
total 64 76 84.2


line stmt bran cond sub pod time code
1             package Parallel::Pipes::App;
2 6     6   123486 use strict;
  6         48  
  6         150  
3 6     6   24 use warnings;
  6         6  
  6         180  
4              
5 6     6   1992 use Parallel::Pipes;
  6         12  
  6         2214  
6              
7             our $VERSION = '0.101';
8              
9 199 50   199   1037 sub _min { $_[0] < $_[1] ? $_[0] : $_[1] }
10              
11             sub run {
12 12     12 1 44544 my ($class, %argv) = @_;
13              
14 12 50       60 my $work = $argv{work} or die "need 'work' argument\n";
15 12 50       36 my $num = $argv{num} or die "need 'num' argument\n";
16 12 50       36 my $tasks = $argv{tasks} or die "need 'tasks' argument\n";
17              
18 12         30 my $before_work = $argv{before_work};
19 12         18 my $after_work = $argv{after_work};
20              
21 12         30 my @result;
22 12         60 my $pipes = Parallel::Pipes->new($num, $work);
23 7         14 while (1) {
24 208         1447 my @ready = $pipes->is_ready;
25 208 100       577 if (my @written = grep { $_->is_written } @ready) {
  233         767  
26 199         713 for my $written (@written) {
27 213         784 my $result = $written->read;
28 213 50       792 if ($after_work) {
29 0         0 $after_work->($result);
30             } else {
31 213         899 push @result, $result;
32             }
33             }
34             }
35 208 100       655 if (@$tasks) {
36 199         360 my $min = _min $#{$tasks}, $#ready;
  199         1228  
37 199         839 for my $i (0 .. $min) {
38 217         557 my $task = shift @$tasks;
39 217 50       675 $before_work->($task) if $before_work;
40 217         913 $ready[$i]->write($task);
41             }
42             } else {
43 9 100       44 if (@ready == $num) {
44 7         33 last;
45             } else {
46 2 50       14 if (my @written = $pipes->is_written) {
47 2         5 my @ready = $pipes->is_ready(@written);
48 2         7 for my $written (@ready) {
49 4         15 my $result = $written->read;
50 4 50       21 if ($after_work) {
51 0         0 $after_work->($result);
52             } else {
53 4         14 push @result, $result;
54             }
55             }
56             } else {
57 0         0 die "unexpected";
58             }
59             }
60             }
61             }
62 7         40 $pipes->close;
63 7 50       265 $after_work ? 1 : @result;
64             }
65              
66             1;
67             __END__