File Coverage

blib/lib/Process/Pipeline.pm
Criterion Covered Total %
statement 131 144 90.9
branch 39 64 60.9
condition 7 12 58.3
subroutine 24 25 96.0
pod 3 3 100.0
total 204 248 82.2


line stmt bran cond sub pod time code
1             package Process::Pipeline;
2 19     19   159326 use 5.008001;
  19         38  
3 19     19   67 use strict;
  19         28  
  19         287  
4 19     19   48 use warnings;
  19         76  
  19         381  
5              
6 19     19   12179 use File::Temp ();
  19         323171  
  19         4381  
7              
8             our $VERSION = '0.04';
9              
10             {
11             package Process::Pipeline::Process;
12             my %SUPPORT_SET = map { $_ => 1 } qw(< > >> 2> 2>> 2>&1);
13             sub new {
14 121     121   130 my $class = shift;
15 121         324 bless { cmd => [], set => {} }, $class;
16             }
17             sub cmd {
18 219     219   940 my ($self, @arg) = @_;
19 219 100       1052 if (@arg) {
20 121 50       200 if (ref $arg[0] eq 'CODE') {
21 0         0 $self->{cmd} = $arg[0];
22             } else {
23 121         423 $self->{cmd} = \@arg;
24             }
25             }
26 219         2107 $self->{cmd};
27             }
28             sub set {
29 73     73   261 my ($self, $key, $value) = @_;
30 73 100       187 if ($key) {
31 10 50       40 die "Unsupport set '$key'" unless $SUPPORT_SET{$key};
32 10         20 $self->{set}{$key} = $value;
33             }
34 73         529 $self->{set};
35             }
36             }
37              
38             {
39             package Process::Pipeline::Result::Each;
40 81     81   1660 sub new { my ($class, %option) = @_; bless {%option}, $class }
  81         2455  
41 1567     1567   6080 sub status { shift->{status} }
42 0     0   0 sub cmd { shift->{cmd} }
43 214     214   392 sub pid { shift->{pid} }
44             }
45              
46             {
47             package Process::Pipeline::Result;
48 19     19   7661 use POSIX ();
  19         111258  
  19         755  
49 19     19   9427 use Process::Status;
  19         15760  
  19         768  
50 19     19   95 use overload '@{}' => sub { shift->{result} };
  19     618   19  
  19         142  
  618         4058  
51              
52             sub new {
53 35     35   57 my $class = shift;
54 35         150 bless {result => [], fh => undef}, $class;
55             }
56             sub push :method {
57 81     81   174 my ($self, $hash) = @_;
58 81         2595 push @$self, $hash;
59 81         297 $self;
60             }
61             sub is_success {
62 18     18   600 my $self = shift;
63 18         36 @$self == grep { $_->status->is_success } @$self;
  58         340  
64             }
65             sub fh {
66 28     28   54645 my $self = shift;
67 28 100       146 $self->{fh} = shift if @_;
68 28         61 $self->{fh};
69             }
70             sub wait :method {
71 18     18   55 my $self = shift;
72 18         64 while (grep { !defined $_->status } @$self) {
  1509         3570  
73 425         3557 my $pid = waitpid -1, POSIX::WNOHANG();
74 425         1243 my $save = $?;
75 425 100       1313 if ($pid == 0) {
    50          
76 367         3838874 select undef, undef, undef, 0.01;
77             } elsif ($pid == -1) {
78 0         0 last;
79             } else {
80 58         136 my ($found) = grep { $_->pid == $pid } @$self;
  214         260  
81 58 50       163 if (!$found) {
82 0         0 warn "waitpid returns $pid, but is not our child!";
83 0         0 last;
84             }
85 58         452 $found->{status} = Process::Status->new($save);
86             }
87             }
88 18 50       111 if (my $filename = delete $self->{_filename}) {
89 18         460 local $!;
90 18         1230 unlink $filename;
91             }
92 18         57 $self;
93             }
94             }
95              
96             sub new {
97 79     79 1 41541 bless { process => [] }, shift;
98             }
99              
100             sub push :method {
101 64     64 1 78 my ($self, $callback) = @_;
102 64         220 my $p = Process::Pipeline::Process->new;
103 64         132 $callback->($p);
104 64         144 $self->_push($p);
105             }
106              
107             sub _push {
108 223     223   226 my ($self, $p) = @_;
109 223         157 push @{$self->{process}}, $p;
  223         311  
110 223         514 $self;
111             }
112              
113             sub start {
114 35     35 1 171 my ($self, %option) = @_;
115 35         31 my $n = $#{$self->{process}};
  35         78  
116 35         161 my @pipe = map { pipe my $read, my $write; [$read, $write] } 0..($n - 1);
  86         1012  
  86         215  
117             my $close = sub {
118 17     17   171 my $i = shift;
119 17 100       427 my @close = map { @{$pipe[$_]} } grep { $_ != $i - 1 && $_ != $i } 0..$#pipe;
  22         83  
  22         270  
  46         960  
120 17         907 $_->close for @close;
121 35         134 };
122              
123 35         62 my ($main_out_fh, $main_out_filename);
124 35         206 my $result = Process::Pipeline::Result->new;
125 35         92 for my $i (0..$n) {
126 98         157 my $process = $self->{process}[$i];
127 98 50 66     619 if ($i == $n && !$process->set->{">"} && !$process->set->{">>"}) {
      66        
128 23         627 ($main_out_fh, $main_out_filename) = File::Temp::tempfile(UNLINK => 0);
129             }
130 98         90237 my $pid = fork;
131 98 50       1333 die "fork: $!" unless defined $pid;
132 98 100       495 if ($pid == 0) {
133 17 100       799 if ($main_out_filename) {
134 5         152 close $main_out_fh;
135 5 50       635 open STDOUT, ">>", $main_out_filename or die $!;
136             }
137 17         454 $close->($i);
138 17 100       726 my $read = $i - 1 >= 0 ? $pipe[$i - 1] : undef;
139 17         129 my $write = $pipe[$i];
140 17 100       171 if ($read) {
141 12         197 $read->[1]->close;
142 12         690 open STDIN, "<&", $read->[0];
143 12         91 $read->[0]->close;
144             }
145 17 100       189 if ($write) {
146 12         162 $write->[0]->close;
147 12         531 open STDOUT, ">&", $write->[1];
148 12         92 $write->[1]->close;
149             }
150              
151 17         117 my %set = %{$process->set};
  17         348  
152 17 50       118 if (my $in = $set{"<"}) {
153 0 0       0 open STDIN, "<", $in or die "open $in: $!";
154             }
155 17 50 33     356 if (my $out = $set{">"} or my $append = $set{">>"}) {
156 0 0       0 my $mode = defined $out ? ">" : ">>";
157 0 0       0 my $file = defined $out ? $out : $append;
158 0 0       0 open STDOUT, $mode, $file or die "open $file: $!";
159             }
160 17 100 66     217 if (my $out = $set{"2>"} or my $append = $set{"2>>"}) {
161 1 50       8 my $mode = defined $out ? ">" : ">>";
162 1 50       5 my $file = defined $out ? $out : $append;
163 1 50       116 open STDERR, $mode, $file or die "open $file: $!";
164             }
165 17 50       150 if (exists $set{"2>&1"}) {
166 0         0 open STDERR, ">&", \*STDOUT;
167             }
168 17         854 STDOUT->autoflush(1);
169              
170 17         2492 my $cmd = $process->cmd;
171 17 50       148 if (ref $cmd eq "CODE") {
172 0         0 $cmd->();
173 0         0 exit;
174             } else {
175 17         237 my @cmd = @$cmd;
176 17         50 exec {$cmd[0]} @cmd;
  17         0  
177 0         0 exit 255;
178             }
179             }
180 81         2985 $result->push(Process::Pipeline::Result::Each->new(
181             pid => $pid,
182             cmd => $process->cmd,
183             status => undef,
184             ));
185             }
186 18         255 $_->close for map { @$_ } @pipe;
  40         921  
187 18 50       746 if ($main_out_filename) {
188 18         169 $result->{_filename} = $main_out_filename;
189 18         55 $result->fh($main_out_fh);
190             }
191 18 50       150 $result->wait unless $option{async};
192 18         590 $result;
193             }
194              
195             1;
196             __END__