File Coverage

blib/lib/Proc/Forkmap.pm
Criterion Covered Total %
statement 21 65 32.3
branch 0 14 0.0
condition 0 8 0.0
subroutine 7 13 53.8
pod 4 4 100.0
total 32 104 30.7


line stmt bran cond sub pod time code
1             package Proc::Forkmap;
2 1     1   21910 use POSIX qw(:sys_wait_h);
  1         6830  
  1         5  
3 1     1   2734 use Proc::Fork;
  1         2789  
  1         9  
4 1     1   178 use Carp;
  1         7  
  1         70  
5 1     1   1676 use IO::Pipe;
  1         10111  
  1         27  
6 1     1   8 use strict;
  1         1  
  1         24  
7 1     1   5 use warnings;
  1         1  
  1         20  
8 1     1   19 use 5.010;
  1         2  
  1         626  
9              
10             our $VERSION = '0.024';
11              
12              
13             sub new {
14 0     0 1   my $class = shift;
15 0           my $self = bless {@_}, $class;
16 0           $self->_init;
17 0           return $self;
18             }
19              
20              
21             sub _init {
22 0     0     my $self = shift;
23 0 0 0       ($self->{max_kids} //= 2) =~ /^[1-9]+$/
24             or croak "max_kids value error";
25 0   0       $self->{ipc} //= 0; #ipc off by default
26             }
27              
28              
29             sub max_kids {
30 0     0 1   my ($self,$n) = @_;
31 0   0       $n // return $self->{max_kids};
32 0 0         ($n =~ /^[1-9]+$/)
33             or croak "max_kids value error";
34 0           $self->{max_kids} = $n;
35             }
36              
37              
38             sub ipc {
39 0     0 1   my ($self,$n) = @_;
40 0   0       $n // return $self->{ipc};
41 0           $self->{ipc} = $n;
42             }
43              
44              
45             sub fmap {
46 0     0 1   my ($self,$code) = (shift,shift);
47 0           my %pids = ();
48 0           my @rs = (); #result set of child return values
49 0           my $max = $self->max_kids;
50 0           my $ipc = $self->ipc;
51 0           for my $proc (@_) {
52 0 0         my $pipe = $ipc ? IO::Pipe->new : {}; #put this in your pipe, and smoke it
53             #max kids?
54 0           while ($max == keys %pids) {
55             #free a spot in queue when a process completes
56 0           for my $pid (keys %pids) {
57 0 0         if (my $kid = waitpid($pid, WNOHANG)) {
58 0           delete $pids{$kid};
59 0           last;
60             }
61             }
62             }
63            
64             run_fork { #processes fork here
65             parent {
66 0           $| = 1;
67 0           my $kid = shift;
68 0           $pids{$kid}++;
69 0 0         if ($ipc) {
70 0           $pipe->reader();
71 0           while(<$pipe>) {
72 0           push @rs,$_;
73             }
74             }
75             }
76             child {
77 0           $| = 1;
78 0           my $rs = $code->($proc);
79 0 0         if ($ipc) {
80 0           $pipe->writer();
81 0 0         print $pipe $rs if defined $rs;
82             }
83 0           exit;
84             }
85             error {
86 0           die "error: couldn't fork";
87             }
88 0     0     };
  0            
89             }
90            
91 0           1 while (wait() != -1); #wait for the stragglers to finish
92 0           return @rs;
93             }
94              
95              
96              
97              
98             1;
99              
100             __END__