File Coverage

blib/lib/Parallel/Pipes.pm
Criterion Covered Total %
statement 138 146 94.5
branch 30 42 71.4
condition n/a
subroutine 29 29 100.0
pod 0 6 0.0
total 197 223 88.3


line stmt bran cond sub pod time code
1             package Parallel::Pipes;
2 23     23   337212 use 5.008001;
  23         131  
3 23     23   85 use strict;
  23         35  
  23         310  
4 23     23   275 use warnings;
  23         40  
  23         556  
5 23     23   6115 use IO::Handle;
  23         65613  
  23         982  
6 23     23   8166 use IO::Select;
  23         33430  
  23         1712  
7              
8 23     23   354 use constant WIN32 => $^O eq 'MSWin32';
  23         50  
  23         2248  
9              
10             our $VERSION = '0.102';
11              
12             {
13             package Parallel::Pipe::Impl;
14 23     23   12460 use Storable ();
  23         64570  
  23         9770  
15             sub new {
16 99     99   3139 my ($class, %option) = @_;
17 99 50       1689 my $read_fh = delete $option{read_fh} or die;
18 99 50       594 my $write_fh = delete $option{write_fh} or die;
19 99         5863 $write_fh->autoflush(1);
20 99         27282 bless { %option, read_fh => $read_fh, write_fh => $write_fh, buf => '' }, $class;
21             }
22             sub read :method {
23 378     378   823 my $self = shift;
24 378 100       1362 my $_size = $self->_read(4) or return;
25 359         1168 my $size = unpack 'I', $_size;
26 359         780 my $freezed = $self->_read($size);
27 359         3162 Storable::thaw($freezed);
28             }
29             sub write :method {
30 359     359   646321 my ($self, $data) = @_;
31 359         1765 my $freezed = Storable::freeze({data => $data});
32 359         22558 my $size = pack 'I', length($freezed);
33 359         4387 $self->_write("$size$freezed");
34             }
35             sub _read {
36 737     737   1265 my ($self, $size) = @_;
37 737         1056 my $fh = $self->{read_fh};
38 737         1069 my $offset = length $self->{buf};
39 737         1501 while ($offset < $size) {
40 381         490152 my $len = sysread $fh, $self->{buf}, 65536, $offset;
41 381 50       2554 if (!defined $len) {
    100          
42 0         0 die $!;
43             } elsif ($len == 0) {
44 19         154 last;
45             } else {
46 362         881 $offset += $len;
47             }
48             }
49 737         4542 return substr $self->{buf}, 0, $size, '';
50             }
51             sub _write {
52 359     359   1764 my ($self, $data) = @_;
53 359         1636 my $fh = $self->{write_fh};
54 359         488 my $size = length $data;
55 359         431 my $offset = 0;
56 359         1005 while ($size) {
57 359         120431 my $len = syswrite $fh, $data, $size, $offset;
58 359 50       1673 if (!defined $len) {
    50          
59 0         0 die $!;
60             } elsif ($len == 0) {
61 0         0 last;
62             } else {
63 359         530 $size -= $len;
64 359         858 $offset += $len;
65             }
66             }
67 359         13097 $size;
68             }
69             }
70             {
71             package Parallel::Pipe::Here;
72             our @ISA = qw(Parallel::Pipe::Impl);
73 23     23   209 use Carp ();
  23         23  
  23         4364  
74             sub new {
75 80     80   3844 my ($class, %option) = @_;
76 80         2742 $class->SUPER::new(%option, _written => 0);
77             }
78             sub is_written {
79 1079     1079   3430 my $self = shift;
80 1079         4483 $self->{_written} == 1;
81             }
82             sub read :method {
83 257     257   535 my $self = shift;
84 257 50       452 if (!$self->is_written) {
85 0         0 Carp::croak("This pipe has not been written; you cannot read it");
86             }
87 257         599 $self->{_written}--;
88 257 50       622 return unless my $read = $self->SUPER::read;
89 257         8999 $read->{data};
90             }
91             sub write :method {
92 257     257   1462 my ($self, $task) = @_;
93 257 50       1002 if ($self->is_written) {
94 0         0 Carp::croak("This pipe has already been written; you must read it first");
95             }
96 257         515 $self->{_written}++;
97 257         765 $self->SUPER::write($task);
98             }
99             }
100             {
101             package Parallel::Pipe::There;
102             our @ISA = qw(Parallel::Pipe::Impl);
103             }
104             {
105             package Parallel::Pipe::Impl::NoFork;
106 23     23   132 use Carp ();
  23         81  
  23         18452  
107             sub new {
108 23     23   86 my ($class, %option) = @_;
109 23         127 bless {%option}, $class;
110             }
111             sub is_written {
112 2168     2168   3129 my $self = shift;
113 2168         8640 exists $self->{_result};
114             }
115             sub read :method {
116 713     713   1426 my $self = shift;
117 713 50       1509 if (!$self->is_written) {
118 0         0 Carp::croak("This pipe has not been written; you cannot read it");
119             }
120 713         1933 delete $self->{_result};
121             }
122             sub write :method {
123 713     713   1797 my ($self, $task) = @_;
124 713 50       1376 if ($self->is_written) {
125 0         0 Carp::croak("This pipe has already been written; you must read it first");
126             }
127 713         4225 my $result = $self->{code}->($task);
128 713         3796978 $self->{_result} = $result;
129             }
130             }
131              
132             sub new {
133 51     51 0 22259 my ($class, $number, $code) = @_;
134 51         127 if (WIN32 and $number != 1) {
135             die "The number of pipes must be 1 under WIN32 environment.\n";
136             }
137 51         426 my $self = bless {
138             code => $code,
139             number => $number,
140             no_fork => $number == 1,
141             pipes => {},
142             }, $class;
143              
144 51 100       170 if ($self->no_fork) {
145 23         167 $self->{pipes}{-1} = Parallel::Pipe::Impl::NoFork->new(code => $self->{code});
146             } else {
147 28         168 $self->_fork for 1 .. $number;
148             }
149 32         262 $self;
150             }
151              
152 1041     1041 0 6340 sub no_fork { shift->{no_fork} }
153              
154             sub _fork {
155 99     99   389 my $self = shift;
156 99         203 my $code = $self->{code};
157 99         3999 pipe my $read_fh1, my $write_fh1;
158 99         6889 pipe my $read_fh2, my $write_fh2;
159 99         272186 my $pid = fork;
160 99 50       4236 die "fork failed" unless defined $pid;
161 99 100       1344 if ($pid == 0) {
162 19         1473 srand;
163 19         927 close $_ for $read_fh1, $write_fh2, map { ($_->{read_fh}, $_->{write_fh}) } $self->pipes;
  36         1516  
164 19         1256 my $there = Parallel::Pipe::There->new(read_fh => $read_fh2, write_fh => $write_fh1);
165 19         589 while (my $read = $there->read) {
166 102         4396 $there->write( $code->($read->{data}) );
167             }
168 19         5607 exit;
169             }
170 80         6768 close $_ for $write_fh1, $read_fh2;
171 80         3991 $self->{pipes}{$pid} = Parallel::Pipe::Here->new(
172             pid => $pid, read_fh => $read_fh1, write_fh => $write_fh2,
173             );
174             }
175              
176             sub pipes {
177 1001     1001 0 1886 my $self = shift;
178 1001         1619 map { $self->{pipes}{$_} } sort { $a <=> $b } keys %{$self->{pipes}};
  1943         7057  
  1975         5463  
  1001         12485  
179             }
180              
181             sub is_ready {
182 959     959 0 2702 my $self = shift;
183 959 100       3209 return $self->pipes if $self->no_fork;
184              
185 223 100       1598 my @pipes = @_ ? @_ : $self->pipes;
186 223 100       704 if (my @ready = grep { $_->{_written} == 0 } @pipes) {
  1029         1984  
187 56         194 return @ready;
188             }
189              
190 167         378 my $select = IO::Select->new(map { $_->{read_fh} } @pipes);
  755         1992  
191 167         22572 my @ready = $select->can_read;
192              
193 167         430636 my @return;
194 167         1475 for my $pipe (@pipes) {
195 755 100       1986 if (grep { $pipe->{read_fh} == $_ } @ready) {
  1185         3635  
196 254         430 push @return, $pipe;
197             }
198             }
199 167         1082 return @return;
200             }
201              
202             sub is_written {
203 47     47 0 148 my $self = shift;
204 47         88 grep { $_->is_written } $self->pipes;
  185         250  
205             }
206              
207             sub close :method {
208 31     31 0 19684 my $self = shift;
209 31 100       85 return if $self->no_fork;
210              
211 8         23 close $_ for map { ($_->{write_fh}, $_->{read_fh}) } $self->pipes;
  40         111769  
212 8         95 while (%{$self->{pipes}}) {
  48         386  
213 40         2619518 my $pid = wait;
214 40 50       1133 if (delete $self->{pipes}{$pid}) {
215             # OK
216             } else {
217 0         0 warn "wait() unexpectedly returns $pid\n";
218             }
219             }
220             }
221              
222             1;
223             __END__