File Coverage

blib/lib/Proc/Topus.pm
Criterion Covered Total %
statement 108 108 100.0
branch 48 56 85.7
condition 9 9 100.0
subroutine 13 13 100.0
pod 1 1 100.0
total 179 187 95.7


line stmt bran cond sub pod time code
1             package Proc::Topus;
2              
3 34     34   45673 use strict;
  34         66  
  34         1185  
4 34     34   170 use warnings;
  34         66  
  34         880  
5              
6 34     34   169 use Exporter;
  34         108  
  34         1614  
7 34     34   37528 use Socket;
  34         164074  
  34         68374  
8              
9              
10             our $VERSION = '0.02';
11              
12             our @ISA = qw( Exporter );
13             our @EXPORT_OK = qw( spawn );
14              
15              
16             sub _CONDUIT () { 'socketpair' }
17             sub _PARENT () { 0 }
18             sub _WORKER () { 1 }
19             sub _READER () { 0 }
20             sub _WRITER () { 1 }
21              
22              
23             sub spawn {
24 193     193 1 1230197 my $args = _args( @_ );
25              
26 187         717 my $workers = $args->{workers};
27 187 100 100     3676 die 'No workers defined'
28             unless defined $workers && keys %$workers > 0;
29              
30 183         1731 my $pairs = _alloc_pairs( $workers, $args->{conduit}, $args->{autoflush} );
31              
32 177         443 my %pids;
33 177         1181 for my $name ( keys %$workers ) {
34 177         391 my $config = $workers->{$name};
35              
36 177         224220 my $pid = fork;
37 177 50       10798 defined $pid
38             or die 'fork: ', $!;
39              
40 177         11507 $pids{$pid} = $name;
41             next
42 177 100       10515 if $pid != 0;
43              
44 31         2055 my @pairs = map { $_->[_WORKER] } @{ delete $pairs->{$name} };
  61         1434  
  31         2004  
45             delete $pairs->{$_}
46 31         4495 for keys %$pairs;
47              
48 31 100       811 if( $config->{setsid} ) {
49 3         8457 require POSIX;
50 3 50       38067 POSIX::setsid()
51             or die 'setsid: ', $!;
52             }
53              
54 31         544 my $loader = $config->{loader};
55 31 100       657 $loader->( @pairs )
56             if ref $loader eq 'CODE';
57              
58 30         678 while( @pairs ) {
59 50         386 my $pair = shift @pairs;
60              
61 50         99226 my $pid = fork;
62 50 50       3823 defined $pid
63             or die 'fork: ', $!;
64              
65             next
66 50 100       4537 if $pid != 0;
67              
68 20         1046 splice @pairs;
69              
70 20         1493 my $main = $config->{main};
71 18     18   2343 $main = sub { $pair }
72 20 100       2251 unless ref $main eq 'CODE';
73              
74 20         1044 return $main->( @$pair[_READER, _WRITER] );
75             }
76              
77 10         4912 exit 0;
78             }
79              
80 146         25738 for my $name ( keys %$pairs ) {
81 291         17779 $pairs->{$name} = [
82 291         1172 map { { reader => $_->[_READER], writer => $_->[_WRITER] } }
83 146         3977 map { $_->[_PARENT] }
84 146         485 @{ $pairs->{$name} }
85             ];
86             }
87              
88 146         4055 _wait( \%pids );
89              
90 145         1668 my $main = $args->{main};
91 120     120   2344 $main = sub { $pairs }
92 145 100       2428 unless ref $main eq 'CODE';
93              
94 145         1412 $main->( $pairs )
95             }
96              
97             sub _args {
98 193 100   193   907 if( scalar @_ == 1 ) {
    100          
99 179 100 100     4912 unless( defined $_[0] && ref $_[0] eq 'HASH' ) {
100 4         36 die "Single arguments must be a HASH ref";
101             }
102              
103 175         465 return $_[0]
104             }
105             elsif( @_ % 2 ) {
106 2         20 die "Odd number of arguments";
107             }
108             else {
109 12         38 return { @_ };
110             }
111             }
112              
113             sub _alloc_pairs {
114 183     183   694 my ( $workers, $_conduit, $_autoflush ) = @_;
115              
116 183 100       2830 $_conduit = _CONDUIT
117             unless defined $_conduit;
118              
119 183 100       631 $_autoflush = 1
120             unless defined $_autoflush;
121              
122 183         332 my %pairs;
123 183         1904 for my $name ( keys %$workers ) {
124 183         308 my $config = $workers->{$name};
125              
126 183         418 my $count = $config->{count};
127 183 100 100     1048 die "Invalid worker count ($name)"
128             unless defined $count && $count > 0;
129              
130 179         341 my $conduit = $config->{conduit};
131 179 100       469 $conduit = $_conduit
132             unless defined $conduit;
133              
134 179         310 my $autoflush = $config->{autoflush};
135 179 100       604 $autoflush = $_autoflush
136             unless defined $autoflush;
137              
138 354         927 $pairs{$name} =
139 179         640 [ map { _alloc_pair( $name, $conduit, $autoflush ) } 1 .. $count ];
140             }
141              
142 177         516 \%pairs
143             }
144              
145             sub _alloc_pair {
146 354     354   744 my ( $name, $conduit, $autoflush ) = @_;
147              
148 354 100       1060 if( $conduit eq 'socketpair' ) {
    100          
149 282 50       13911 socketpair my $ps, my $ws, AF_UNIX, SOCK_STREAM, PF_UNSPEC
150             or die 'socketpair: ', $!;
151              
152 282         1132 _autoflush( $autoflush, $ps, $ws );
153              
154 282         813 return _pair( $ps, $ps, $ws, $ws );
155             }
156             elsif( $conduit eq 'pipe' ) {
157 70 50       3944 pipe my $pr, my $ww
158             or die 'pipe: ', $!;
159 70 50       1771 pipe my $wr, my $pw
160             or die 'pipe: ', $!;
161              
162 70         346 _autoflush( $autoflush, $pr, $pw, $wr, $ww );
163              
164 70         194 return _pair( $pr, $pw, $wr, $ww );
165             }
166              
167 2         18 die "Invalid worker conduit '$conduit' for worker '$name'";
168             }
169              
170             sub _autoflush {
171             return
172 352 100   352   994 unless shift;
173              
174 306         1679 my $fh = select;
175 752         2539 do { select $_; $| = 1 }
  752         1932  
176 306         883 for @_;
177 306         999 select $fh
178             }
179              
180             sub _pair {
181 352     352   695 my ( $pr, $pw, $wr, $ww ) = @_;
182              
183 352         402 my $pp; @$pp[_READER, _WRITER] = ( $pr, $pw );
  352         1212  
184 352         1588 my $wp; @$wp[_READER, _WRITER] = ( $wr, $ww );
  352         966  
185              
186 352         423 my $pair; @$pair[_PARENT, _WORKER] = ( $pp, $wp );
  352         774  
187 352         2048 $pair
188             }
189              
190             sub _wait {
191 146     146   296 my ( $pids ) = @_;
192              
193 146         226 my $errors = 0;
194              
195 146         2268 until( keys %$pids == 0 ) {
196 146         47131079 my $pid = waitpid -1, 0;
197 146 50       3331 die 'waitpid: ', $!
198             if $pid == -1;
199              
200 146         3175 my $name = delete $pids->{$pid};
201             next
202 146 50       1288 unless defined $name;
203              
204 146         4374 my $rc = $? >> 8;
205 146 100       1388 $errors++
206             unless $rc == 0;
207             }
208              
209 146 100       1406 die "One or more loaders failed"
210             if $errors;
211             }
212              
213              
214             1
215             __END__