File Coverage

blib/lib/Parallel/Pipes.pm
Criterion Covered Total %
statement 144 152 94.7
branch 34 46 73.9
condition 2 2 100.0
subroutine 29 29 100.0
pod 0 6 0.0
total 209 235 88.9


line stmt bran cond sub pod time code
1             package Parallel::Pipes;
2 34     34   1219252 use 5.008001;
  34         330  
3 34     34   168 use strict;
  34         68  
  34         642  
4 34     34   136 use warnings;
  34         57  
  34         958  
5 34     34   13318 use IO::Handle;
  34         147015  
  34         1529  
6 34     34   16057 use IO::Select;
  34         56790  
  34         1880  
7              
8 34     34   293 use constant WIN32 => $^O eq 'MSWin32';
  34         62  
  34         4025  
9              
10             our $VERSION = '0.200';
11              
12             {
13             package Parallel::Pipes::Impl;
14 34     34   22555 use Storable ();
  34         109791  
  34         15623  
15             sub new {
16 164     164   4020 my ($class, %option) = @_;
17 164 50       2795 my $read_fh = delete $option{read_fh} or die;
18 164 50       973 my $write_fh = delete $option{write_fh} or die;
19 164         8794 $write_fh->autoflush(1);
20 164         43302 bless { %option, read_fh => $read_fh, write_fh => $write_fh, buf => '' }, $class;
21             }
22             sub read :method {
23 433     433   1249 my $self = shift;
24 433 100       2669 my $_size = $self->_read(4) or return;
25 404         2025 my $size = unpack 'I', $_size;
26 404         1583 my $freezed = $self->_read($size);
27 404         2087 Storable::thaw($freezed);
28             }
29             sub write :method {
30 404     404   10652426 my ($self, $data) = @_;
31 404         2912 my $freezed = Storable::freeze({data => $data});
32 404         28891 my $size = pack 'I', length($freezed);
33 404         3512 $self->_write("$size$freezed");
34             }
35             sub _read {
36 837     837   2681 my ($self, $size) = @_;
37 837         2314 my $fh = $self->{read_fh};
38 837         1956 my $offset = length $self->{buf};
39 837         2698 while ($offset < $size) {
40 436         377463 my $len = sysread $fh, $self->{buf}, 65536, $offset;
41 436 50       3588 if (!defined $len) {
    100          
42 0         0 die $!;
43             } elsif ($len == 0) {
44 29         449 last;
45             } else {
46 407         1644 $offset += $len;
47             }
48             }
49 837         6840 return substr $self->{buf}, 0, $size, '';
50             }
51             sub _write {
52 404     404   1144 my ($self, $data) = @_;
53 404         1385 my $fh = $self->{write_fh};
54 404         916 my $size = length $data;
55 404         971 my $offset = 0;
56 404         1375 while ($size) {
57 404         18825 my $len = syswrite $fh, $data, $size, $offset;
58 404 50       2536 if (!defined $len) {
    50          
59 0         0 die $!;
60             } elsif ($len == 0) {
61 0         0 last;
62             } else {
63 404         949 $size -= $len;
64 404         1441 $offset += $len;
65             }
66             }
67 404         3395 $size;
68             }
69             }
70             {
71             package Parallel::Pipes::Here;
72             our @ISA = qw(Parallel::Pipes::Impl);
73 34     34   392 use Carp ();
  34         201  
  34         8636  
74             sub new {
75 135     135   6692 my ($class, %option) = @_;
76 135         6007 $class->SUPER::new(%option, _written => 0);
77             }
78             sub is_written {
79 1264     1264   2082 my $self = shift;
80 1264         4123 $self->{_written} == 1;
81             }
82             sub read :method {
83 292     292   924 my $self = shift;
84 292 50       570 if (!$self->is_written) {
85 0         0 Carp::croak("This pipe has not been written; you cannot read it");
86             }
87 292         605 $self->{_written}--;
88 292 50       1095 return unless my $read = $self->SUPER::read;
89 292         8380 $read->{data};
90             }
91             sub write :method {
92 292     292   910 my ($self, $task) = @_;
93 292 50       712 if ($self->is_written) {
94 0         0 Carp::croak("This pipe has already been written; you must read it first");
95             }
96 292         651 $self->{_written}++;
97 292         1029 $self->SUPER::write($task);
98             }
99             }
100             {
101             package Parallel::Pipes::There;
102             our @ISA = qw(Parallel::Pipes::Impl);
103             }
104             {
105             package Parallel::Pipes::Impl::NoFork;
106 34     34   239 use Carp ();
  34         74  
  34         33856  
107             sub new {
108 24     24   116 my ($class, %option) = @_;
109 24         242 bless {%option}, $class;
110             }
111             sub is_written {
112 2172     2172   4936 my $self = shift;
113 2172         9476 exists $self->{_result};
114             }
115             sub read :method {
116 714     714   2576 my $self = shift;
117 714 50       1606 if (!$self->is_written) {
118 0         0 Carp::croak("This pipe has not been written; you cannot read it");
119             }
120 714         2854 delete $self->{_result};
121             }
122             sub write :method {
123 714     714   3090 my ($self, $task) = @_;
124 714 50       1566 if ($self->is_written) {
125 0         0 Carp::croak("This pipe has already been written; you must read it first");
126             }
127 714         2492 my $result = $self->{work}->($task);
128 714         4841807 $self->{_result} = $result;
129             }
130             }
131              
132             sub new {
133 69     69 0 50810 my ($class, $number, $work, $option) = @_;
134 69         213 if (WIN32 and $number != 1) {
135             die "The number of pipes must be 1 under WIN32 environment.\n";
136             }
137 69   100     1105 my $self = bless {
138             work => $work,
139             number => $number,
140             no_fork => $number == 1,
141             pipes => {},
142             option => $option || {},
143             }, $class;
144              
145 69 100       342 if ($self->no_fork) {
146 24         257 $self->{pipes}{-1} = Parallel::Pipes::Impl::NoFork->new(pid => -1, work => $self->{work});
147             } else {
148 45         293 $self->_fork for 1 .. $number;
149             }
150 40         645 $self;
151             }
152              
153 1139     1139 0 6474 sub no_fork { shift->{no_fork} }
154              
155             sub _fork {
156 164     164   482 my $self = shift;
157 164         429 my $work = $self->{work};
158 164         7393 pipe my $read_fh1, my $write_fh1;
159 164         6825 pipe my $read_fh2, my $write_fh2;
160 164         122631 my $pid = fork;
161 164 50       8815 die "fork failed" unless defined $pid;
162 164 100       3143 if ($pid == 0) {
163 29         3429 srand;
164 29         2609 close $_ for $read_fh1, $write_fh2, map { ($_->{read_fh}, $_->{write_fh}) } $self->pipes;
  56         3856  
165 29         3852 my $there = Parallel::Pipes::There->new(read_fh => $read_fh2, write_fh => $write_fh1);
166 29         1474 while (my $read = $there->read) {
167 112         9459 $there->write( $work->($read->{data}) );
168             }
169 29         14766 exit;
170             }
171 135         10996 close $_ for $write_fh1, $read_fh2;
172 135         8624 $self->{pipes}{$pid} = Parallel::Pipes::Here->new(
173             pid => $pid, read_fh => $read_fh1, write_fh => $write_fh2,
174             );
175             }
176              
177             sub pipes {
178 1096     1096 0 2655 my $self = shift;
179 1096         2616 map { $self->{pipes}{$_} } sort { $a <=> $b } keys %{$self->{pipes}};
  2380         9709  
  2718         6613  
  1096         9854  
180             }
181              
182             sub is_ready {
183 1031     1031 0 4303 my $self = shift;
184 1031 100       3102 return $self->pipes if $self->no_fork;
185              
186 293 100       1548 my @pipes = @_ ? @_ : $self->pipes;
187 293 100       748 if (my @ready = grep { $_->{_written} == 0 } @pipes) {
  1360         3641  
188 59         326 return @ready;
189             }
190              
191 234         989 my $select = IO::Select->new(map { $_->{read_fh} } @pipes);
  1071         2799  
192 234         29966 my @ready;
193 234 100       620 if (my $tick = $self->{option}{idle_tick}) {
194 15         35 while (1) {
195 29 100       360 if (my @r = $select->can_read($tick)) {
196 15         1395529 @ready = @r, last;
197             }
198 14         5608905 $self->{option}{idle_work}->();
199             }
200             } else {
201 219         571 @ready = $select->can_read;
202             }
203              
204 234         502159 my @return;
205 234         813 for my $pipe (@pipes) {
206 1071 100       1815 if (grep { $pipe->{read_fh} == $_ } @ready) {
  1311         4661  
207 288         665 push @return, $pipe;
208             }
209             }
210 234         2165 return @return;
211             }
212              
213             sub is_written {
214 68     68 0 514 my $self = shift;
215 68         272 grep { $_->is_written } $self->pipes;
  290         689  
216             }
217              
218             sub close :method {
219 39     39 0 46561 my $self = shift;
220 39 100       288 return if $self->no_fork;
221              
222 15         77 close $_ for map { ($_->{write_fh}, $_->{read_fh}) } $self->pipes;
  75         3934  
223 15         116 while (%{$self->{pipes}}) {
  90         729  
224 75         4616856 my $pid = wait;
225 75 50       2399 if (delete $self->{pipes}{$pid}) {
226             # OK
227             } else {
228 0         0 warn "wait() unexpectedly returns $pid\n";
229             }
230             }
231             }
232              
233             1;
234             __END__