File Coverage

blib/lib/Parallel/Pipes.pm
Criterion Covered Total %
statement 138 146 94.5
branch 30 42 71.4
condition n/a
subroutine 29 29 100.0
pod 0 6 0.0
total 197 223 88.3


line stmt bran cond sub pod time code
1             package Parallel::Pipes;
2 18     18   399817 use 5.008001;
  18         128  
3 18     18   78 use strict;
  18         18  
  18         296  
4 18     18   71 use warnings;
  18         19  
  18         428  
5 18     18   5724 use IO::Handle;
  18         61349  
  18         689  
6 18     18   7090 use IO::Select;
  18         24746  
  18         868  
7              
8 18     18   102 use constant WIN32 => $^O eq 'MSWin32';
  18         36  
  18         1744  
9              
10             our $VERSION = '0.101';
11              
12             {
13             package Parallel::Pipe::Impl;
14 18     18   8986 use Storable ();
  18         47601  
  18         6773  
15             sub new {
16 54     54   1045 my ($class, %option) = @_;
17 54 50       792 my $read_fh = delete $option{read_fh} or die;
18 54 50       429 my $write_fh = delete $option{write_fh} or die;
19 54         2341 $write_fh->autoflush(1);
20 54         11194 bless { %option, read_fh => $read_fh, write_fh => $write_fh, buf => '' }, $class;
21             }
22             sub read :method {
23 156     156   354 my $self = shift;
24 156 100       647 my $_size = $self->_read(4) or return;
25 142         507 my $size = unpack 'I', $_size;
26 142         413 my $freezed = $self->_read($size);
27 142         622 Storable::thaw($freezed);
28             }
29             sub write :method {
30 142     142   637432 my ($self, $data) = @_;
31 142         831 my $freezed = Storable::freeze({data => $data});
32 142         7910 my $size = pack 'I', length($freezed);
33 142         1005 $self->_write("$size$freezed");
34             }
35             sub _read {
36 298     298   688 my ($self, $size) = @_;
37 298         536 my $fh = $self->{read_fh};
38 298         461 my $offset = length $self->{buf};
39 298         803 while ($offset < $size) {
40 159         204439 my $len = sysread $fh, $self->{buf}, 65536, $offset;
41 159 50       1327 if (!defined $len) {
    100          
42 0         0 die $!;
43             } elsif ($len == 0) {
44 14         78 last;
45             } else {
46 145         588 $offset += $len;
47             }
48             }
49 298         1716 return substr $self->{buf}, 0, $size, '';
50             }
51             sub _write {
52 142     142   313 my ($self, $data) = @_;
53 142         306 my $fh = $self->{write_fh};
54 142         215 my $size = length $data;
55 142         209 my $offset = 0;
56 142         415 while ($size) {
57 142         6616 my $len = syswrite $fh, $data, $size, $offset;
58 142 50       702 if (!defined $len) {
    50          
59 0         0 die $!;
60             } elsif ($len == 0) {
61 0         0 last;
62             } else {
63 142         195 $size -= $len;
64 142         377 $offset += $len;
65             }
66             }
67 142         671 $size;
68             }
69             }
70             {
71             package Parallel::Pipe::Here;
72             our @ISA = qw(Parallel::Pipe::Impl);
73 18     18   150 use Carp ();
  18         24  
  18         4048  
74             sub new {
75 40     40   1632 my ($class, %option) = @_;
76 40         1553 $class->SUPER::new(%option, _written => 0);
77             }
78             sub is_written {
79 301     301   452 my $self = shift;
80 301         734 $self->{_written} == 1;
81             }
82             sub read :method {
83 71     71   171 my $self = shift;
84 71 50       118 if (!$self->is_written) {
85 0         0 Carp::croak("This pipe has not been written; you cannot read it");
86             }
87 71         95 $self->{_written}--;
88 71 50       192 return unless my $read = $self->SUPER::read;
89 71         1832 $read->{data};
90             }
91             sub write :method {
92 71     71   213 my ($self, $task) = @_;
93 71 50       125 if ($self->is_written) {
94 0         0 Carp::croak("This pipe has already been written; you must read it first");
95             }
96 71         112 $self->{_written}++;
97 71         174 $self->SUPER::write($task);
98             }
99             }
100             {
101             package Parallel::Pipe::There;
102             our @ISA = qw(Parallel::Pipe::Impl);
103             }
104             {
105             package Parallel::Pipe::Impl::NoFork;
106 18     18   114 use Carp ();
  18         66  
  18         16331  
107             sub new {
108 12     12   48 my ($class, %option) = @_;
109 12         114 bless {%option}, $class;
110             }
111             sub is_written {
112 1134     1134   2628 my $self = shift;
113 1134         4806 exists $self->{_result};
114             }
115             sub read :method {
116 372     372   1404 my $self = shift;
117 372 50       930 if (!$self->is_written) {
118 0         0 Carp::croak("This pipe has not been written; you cannot read it");
119             }
120 372         1776 delete $self->{_result};
121             }
122             sub write :method {
123 372     372   1854 my ($self, $task) = @_;
124 372 50       834 if ($self->is_written) {
125 0         0 Carp::croak("This pipe has already been written; you must read it first");
126             }
127 372         1758 my $result = $self->{code}->($task);
128 372         3854910 $self->{_result} = $result;
129             }
130             }
131              
132             sub new {
133 29     29 0 24919 my ($class, $number, $code) = @_;
134 29         89 if (WIN32 and $number != 1) {
135             die "The number of pipes must be 1 under WIN32 environment.\n";
136             }
137 29         359 my $self = bless {
138             code => $code,
139             number => $number,
140             no_fork => $number == 1,
141             pipes => {},
142             }, $class;
143              
144 29 100       117 if ($self->no_fork) {
145 12         84 $self->{pipes}{-1} = Parallel::Pipe::Impl::NoFork->new(code => $self->{code});
146             } else {
147 17         114 $self->_fork for 1 .. $number;
148             }
149 15         129 $self;
150             }
151              
152 489     489 0 3240 sub no_fork { shift->{no_fork} }
153              
154             sub _fork {
155 54     54   137 my $self = shift;
156 54         108 my $code = $self->{code};
157 54         2337 pipe my $read_fh1, my $write_fh1;
158 54         1965 pipe my $read_fh2, my $write_fh2;
159 54         35321 my $pid = fork;
160 54 50       2557 die "fork failed" unless defined $pid;
161 54 100       996 if ($pid == 0) {
162 14         1120 srand;
163 14         752 close $_ for $read_fh1, $write_fh2, map { ($_->{read_fh}, $_->{write_fh}) } $self->pipes;
  26         1568  
164 14         1083 my $there = Parallel::Pipe::There->new(read_fh => $read_fh2, write_fh => $write_fh1);
165 14         455 while (my $read = $there->read) {
166 71         3622 $there->write( $code->($read->{data}) );
167             }
168 14         3875 exit;
169             }
170 40         2622 close $_ for $write_fh1, $read_fh2;
171 40         2374 $self->{pipes}{$pid} = Parallel::Pipe::Here->new(
172             pid => $pid, read_fh => $read_fh1, write_fh => $write_fh2,
173             );
174             }
175              
176             sub pipes {
177 477     477 0 891 my $self = shift;
178 477         952 map { $self->{pipes}{$_} } sort { $a <=> $b } keys %{$self->{pipes}};
  746         3880  
  568         1203  
  477         4476  
179             }
180              
181             sub is_ready {
182 446     446 0 3086 my $self = shift;
183 446 100       1818 return $self->pipes if $self->no_fork;
184              
185 62 100       262 my @pipes = @_ ? @_ : $self->pipes;
186 62 100       134 if (my @ready = grep { $_->{_written} == 0 } @pipes) {
  288         602  
187 24         160 return @ready;
188             }
189              
190 38         81 my $select = IO::Select->new(map { $_->{read_fh} } @pipes);
  174         402  
191 38         4010 my @ready = $select->can_read;
192              
193 38         134690 my @return;
194 38         120 for my $pipe (@pipes) {
195 174 100       262 if (grep { $pipe->{read_fh} == $_ } @ready) {
  314         803  
196 67         118 push @return, $pipe;
197             }
198             }
199 38         284 return @return;
200             }
201              
202             sub is_written {
203 21     21 0 140 my $self = shift;
204 21         81 grep { $_->is_written } $self->pipes;
  55         111  
205             }
206              
207             sub close :method {
208 14     14 0 22664 my $self = shift;
209 14 100       48 return if $self->no_fork;
210              
211 2         9 close $_ for map { ($_->{write_fh}, $_->{read_fh}) } $self->pipes;
  10         343  
212 2         19 while (%{$self->{pipes}}) {
  12         126  
213 10         466150 my $pid = wait;
214 10 50       300 if (delete $self->{pipes}{$pid}) {
215             # OK
216             } else {
217 0         0 warn "wait() unexpectedly returns $pid\n";
218             }
219             }
220             }
221              
222             1;
223             __END__