File Coverage

blib/lib/Parallel/Pipes/App.pm
Criterion Covered Total %
statement 59 60 98.3
branch 18 28 64.2
condition n/a
subroutine 8 8 100.0
pod 2 2 100.0
total 87 98 88.7


line stmt bran cond sub pod time code
1             package Parallel::Pipes::App;
2 11     11   229537 use strict;
  11         121  
  11         286  
3 11     11   55 use warnings;
  11         11  
  11         308  
4              
5 11     11   3630 use Parallel::Pipes;
  11         33  
  11         5379  
6              
7             our $VERSION = '0.102';
8              
9 644 100   644   3155 sub _min { $_[0] < $_[1] ? $_[0] : $_[1] }
10              
11             sub run {
12 34     34 1 123999 my ($class, %argv) = @_;
13              
14 34 50       153 my $work = $argv{work} or die "need 'work' argument\n";
15 34 50       1329 my $num = $argv{num} or die "need 'num' argument\n";
16 34 50       146 my $tasks = $argv{tasks} or die "need 'tasks' argument\n";
17 34         68 my $before_work = $argv{before_work};
18 34         79 my $after_work = $argv{after_work};
19              
20 34         259 my $pipes = Parallel::Pipes->new($num, $work);
21 24         48 while (1) {
22 695         7698 my @ready = $pipes->is_ready;
23 695 100       1840 if (my @written = grep { $_->is_written } @ready) {
  869         3148  
24 638         2461 for my $written (@written) {
25 716         3820 my $result = $written->read;
26 716 50       2749 $after_work->($result) if $after_work;
27             }
28             }
29 695 100       4492 if (@$tasks) {
30 644         1024 my $min = _min $#{$tasks}, $#ready;
  644         2505  
31 644         2089 for my $i (0 .. $min) {
32 744         3055 my $task = shift @$tasks;
33 744 50       1666 $before_work->($task) if $before_work;
34 744         3759 $ready[$i]->write($task);
35             }
36             } else {
37 51 100       140 if (@ready == $num) {
38 24         59 last;
39             } else {
40 27 50       102 if (my @written = $pipes->is_written) {
41 27         60 my @ready = $pipes->is_ready(@written);
42 27         67 for my $written (@ready) {
43 28         111 my $result = $written->read;
44 28 50       93 $after_work->($result) if $after_work;
45             }
46             } else {
47 0         0 die "unexpected";
48             }
49             }
50             }
51             }
52 24         113 $pipes->close;
53 24         404 1;
54             }
55              
56             sub map :method {
57 22     22 1 131142 my ($class, %argv) = @_;
58              
59 22 50       110 my $orig_num = $argv{num} or die "need 'num' argument\n";
60 22 50       88 my $orig_tasks = $argv{tasks} or die "need 'tasks' argument\n";
61 22 50       66 my $orig_work = $argv{work} or die "need 'work' argument\n";
62              
63 22         66 my @task = map { [$_, $orig_tasks->[$_]] } 0..$#{$orig_tasks};
  682         946  
  22         77  
64             my $work = sub {
65 372     372   914 my ($index, $task) = @{$_[0]};
  372         1241  
66 372         1315 my $result = $orig_work->($task);
67 372         3893817 [$index, $result];
68 22         143 };
69 22         33 my @result;
70             my $after_work = sub {
71 527     527   883 my ($index, $result) = @{$_[0]};
  527         3844  
72 527         2233 $result[$index] = $result;
73 22         77 };
74 22         143 $class->run(num => $orig_num, work => $work, tasks => \@task, after_work => $after_work);
75 17         416 @result;
76             }
77              
78             1;
79             __END__