File Coverage

blib/lib/Parallel/Pipes/App.pm
Criterion Covered Total %
statement 63 64 98.4
branch 22 32 68.7
condition n/a
subroutine 8 8 100.0
pod 2 2 100.0
total 95 106 89.6


line stmt bran cond sub pod time code
1             package Parallel::Pipes::App;
2 22     22   276749 use strict;
  22         132  
  22         627  
3 22     22   110 use warnings;
  22         44  
  22         550  
4              
5 22     22   4411 use Parallel::Pipes;
  22         55  
  22         15059  
6              
7             our $VERSION = '0.200';
8              
9 690 50   690   3110 sub _min { $_[0] < $_[1] ? $_[0] : $_[1] }
10              
11             sub run {
12 41     41 1 140345 my ($class, %argv) = @_;
13              
14 41 50       221 my $work = $argv{work} or die "need 'work' argument\n";
15 41 50       162 my $num = $argv{num} or die "need 'num' argument\n";
16 41 50       125 my $tasks = $argv{tasks} or die "need 'tasks' argument\n";
17 41         127 my $before_work = $argv{before_work};
18 41         88 my $after_work = $argv{after_work};
19 41         82 my $init_work = $argv{init_work};
20 41         99 my $idle_tick = $argv{idle_tick};
21 41         105 my $idle_work = $argv{idle_work};
22              
23 41 100       356 my $pipes = Parallel::Pipes->new(
24             $num,
25             $work,
26             $idle_tick ? { idle_tick => $idle_tick, idle_work => $idle_work } : (),
27             );
28 26 100       153 $init_work->($pipes) if $init_work;
29 26         2146 while (1) {
30 746         4424 my @ready = $pipes->is_ready;
31 746 100       1814 if (my @written = grep { $_->is_written } @ready) {
  882         2364  
32 690         1778 for my $written (@written) {
33 718         1915 my $result = $written->read;
34 718 50       2851 $after_work->($result, $written) if $after_work;
35             }
36             }
37 746 100       5537 if (@$tasks) {
38 690         1239 my $min = _min $#{$tasks}, $#ready;
  690         3033  
39 690         2281 for my $i (0 .. $min) {
40 750         1988 my $task = shift @$tasks;
41 750 100       1959 $before_work->($task, $ready[$i]) if $before_work;
42 750         4591 $ready[$i]->write($task);
43             }
44             } else {
45 56 100       417 if (@ready == $num) {
46 26         99 last;
47             } else {
48 30 50       109 if (my @written = $pipes->is_written) {
49 30         88 my @ready = $pipes->is_ready(@written);
50 30         88 for my $written (@ready) {
51 32         824 my $result = $written->read;
52 32 50       126 $after_work->($result, $written) if $after_work;
53             }
54             } else {
55 0         0 die "unexpected";
56             }
57             }
58             }
59             }
60 26         165 $pipes->close;
61 26         783 1;
62             }
63              
64             sub map :method {
65 22     22 1 94270 my ($class, %argv) = @_;
66              
67 22 50       99 my $orig_num = $argv{num} or die "need 'num' argument\n";
68 22 50       77 my $orig_tasks = $argv{tasks} or die "need 'tasks' argument\n";
69 22 50       66 my $orig_work = $argv{work} or die "need 'work' argument\n";
70              
71 22         77 my @task = map { [$_, $orig_tasks->[$_]] } 0..$#{$orig_tasks};
  682         1298  
  22         77  
72             my $work = sub {
73 372     372   610 my ($index, $task) = @{$_[0]};
  372         873  
74 372         1227 my $result = $orig_work->($task);
75 372         3823018 [$index, $result];
76 22         110 };
77 22         44 my @result;
78             my $after_work = sub {
79 527     527   819 my ($index, $result) = @{$_[0]};
  527         1489  
80 527         2120 $result[$index] = $result;
81 22         66 };
82 22         110 $class->run(num => $orig_num, work => $work, tasks => \@task, after_work => $after_work);
83 17         479 @result;
84             }
85              
86             1;
87             __END__