File Coverage

blib/lib/Proc/Forkmap.pm
Criterion Covered Total %
statement 20 124 16.1
branch 0 30 0.0
condition 0 16 0.0
subroutine 7 18 38.8
pod 7 7 100.0
total 34 195 17.4


line stmt bran cond sub pod time code
1             package Proc::Forkmap;
2 1     1   64004 use POSIX qw(:sys_wait_h);
  1         5856  
  1         4  
3 1     1   1660 use Proc::Fork;
  1         1784  
  1         4  
4 1     1   137 use Carp;
  1         2  
  1         43  
5 1     1   395 use IO::Pipe;
  1         6721  
  1         28  
6 1     1   6 use strict;
  1         2  
  1         15  
7 1     1   4 use warnings;
  1         2  
  1         20  
8 1     1   13 use 5.010;
  1         3  
9             require Exporter;
10             our @ISA = qw(Exporter);
11             our @EXPORT_OK = qw(forkmap);
12              
13             our $MAX_PROC = 4;
14             our $VERSION = '0.2008';
15             our $TIMEOUT = 0;
16              
17             sub new {
18 0     0 1   my $class = shift;
19 0           my $self = bless {@_}, $class;
20 0           $self->_init;
21 0           return $self;
22             }
23              
24             sub _init {
25 0     0     my $self = shift;
26 0   0       $self->{max_kids} //= 4;
27 0   0       $self->{ipc} //= 0;
28 0   0       $self->{non_blocking} //= 1;
29 0   0       $self->{timeout} //= 0;
30             }
31              
32             sub max_kids {
33 0     0 1   my ($self, $n) = @_;
34 0   0       $n // return $self->{max_kids};
35 0           $self->{max_kids} = $n;
36             }
37              
38             sub non_blocking {
39 0     0 1   my ($self, $n) = @_;
40 0   0       $n // return $self->{non_blocking};
41 0           $self->{non_blocking} = $n;
42             }
43              
44             sub ipc {
45 0     0 1   my ($self, $n) = @_;
46 0   0       $n // return $self->{ipc};
47 0           $self->{ipc} = $n;
48             }
49              
50             sub timeout {
51 0     0 1   my ($self, $n) = @_;
52 0   0       $n // return $self->{timeout};
53 0           $self->{timeout} = $n;
54             }
55              
56             sub fmap {
57 0     0 1   my ($self, $code) = (shift, shift);
58 0           my %pids = ();
59 0           my @rs = (); # result set of child return values
60 0           my $max = $self->max_kids;
61 0           my $ipc = $self->ipc;
62 0           my $timeout = $self->timeout;
63 0           my $non_blocking = $self->non_blocking;
64 0           for my $proc (@_) {
65 0 0         my $pipe = $ipc ? IO::Pipe->new : {};
66             # max kids?
67 0           while ($max == keys %pids) {
68             # free a spot in queue when a process completes
69 0           for my $pid (keys %pids) {
70 0 0         if (my $kid = waitpid($pid, WNOHANG)) {
71 0           delete $pids{$kid};
72 0           last;
73             }
74             }
75             }
76             my $fn = sub {
77 0     0     my $rs = shift;
78 0 0         if ($ipc) {
79 0           $pipe->writer();
80 0           $pipe->autoflush;
81 0 0         print $pipe $rs if defined $rs;
82             }
83 0           return 1;
84 0           };
85             run_fork { # processes fork here
86             parent {
87 0           my $kid = shift;
88 0           $pids{$kid}++;
89 0 0         if ($ipc) {
90 0           $pipe->reader();
91 0 0         if ($non_blocking) {
92 0           $pipe->blocking(0);
93             } else {
94 0           $pipe->blocking(1);
95             }
96 0           while(<$pipe>) {
97 0           push @rs, $_;
98             }
99             }
100             }
101             child {
102 0 0         if (!$timeout) {
103 0           my $rs = $code->($proc);
104 0           $fn->($rs);
105 0           exit;
106             }
107 0           eval {
108             local $SIG{ALRM} = sub {
109 0           die "alarm\n"
110 0           };
111 0           alarm $timeout;
112 0           my $rs = $code->($proc);
113 0           $fn->($rs);
114 0           alarm 0;
115             };
116 0 0         if ($@) {
117 0 0         if ($@ eq "alarm\n") {
118 0           print STDERR "error: timeout $$\n";
119             }
120 0 0         die unless $@ eq "alarm\n";
121             }
122 0           exit;
123             }
124             error {
125 0           die "error: couldn't fork";
126             }
127 0     0     };
  0            
128             }
129 0           1 while (wait() != -1); # wait for the stragglers to finish
130 0           return @rs;
131             }
132              
133             sub forkmap (&@) {
134 0     0 1   my ($code, @args) = @_;
135 0           my %pids = ();
136 0           my $max = $MAX_PROC;
137 0           for my $proc (@args) {
138 0           while ($max == keys %pids) {
139 0           for my $pid (keys %pids) {
140 0 0         if (my $kid = waitpid($pid, WNOHANG)) {
141 0           delete $pids{$kid};
142 0           last;
143             }
144             }
145             }
146             run_fork {
147             parent {
148 0           my $kid = shift;
149 0           $pids{$kid}++;
150             }
151             child {
152 0 0         if (!$TIMEOUT) {
153 0           local $_ = $proc;
154 0           $code->();
155 0           exit;
156             }
157 0           eval {
158             local $SIG{ALRM} = sub {
159 0           die "alarm\n"
160 0           };
161 0           alarm $TIMEOUT;
162 0           local $_ = $proc;
163 0           $code->();
164 0           alarm 0;
165             };
166 0 0         if ($@) {
167 0 0         if ($@ eq "alarm\n") {
168 0           print STDERR "error: timeout $$\n";
169             }
170 0 0         die unless $@ eq "alarm\n";
171             }
172 0           exit;
173             }
174             error {
175 0           die "error: couldn't fork";
176             }
177 0     0     };
  0            
178             }
179 0           1 while (wait() != -1);
180 0           return 1;
181             }
182              
183             1;
184              
185             __END__