File Coverage

blib/lib/Parallel/Subs.pm
Criterion Covered Total %
statement 90 127 70.8
branch 19 44 43.1
condition 5 16 31.2
subroutine 13 15 86.6
pod 7 7 100.0
total 134 209 64.1


line stmt bran cond sub pod time code
1             package Parallel::Subs;
2             $Parallel::Subs::VERSION = '0.002';
3 44     44   1555215 use strict;
  44         263  
  44         1421  
4 44     44   239 use warnings;
  44         95  
  44         1479  
5              
6 44     44   26237 use Parallel::ForkManager;
  44         1397764  
  44         2048  
7 44     44   29400 use Sys::Info;
  44         327209  
  44         250  
8              
9             # ABSTRACT: Simple way to run subs in parallel and process their return value in perl
10              
11              
12             sub new {
13 102     102 1 109419 my ( $class, %opts ) = @_;
14              
15 102         648 my $self = bless {}, __PACKAGE__;
16              
17 102         1114 $self->_init(%opts);
18              
19 102         670 return $self;
20             }
21              
22             sub _init {
23 102     102   650 my ( $self, %opts ) = @_;
24              
25 102         938 $self->_pfork(%opts);
26 102         452 $self->{result} = {};
27             $self->{pfork}->run_on_finish(
28             sub {
29 243     243   44210119 my ( $pid, $exit, $id, $exit_signal, $core_dump, $data ) = @_;
30 243 50 33     4432 die "Failed to process on one job, stop here !"
31             if $exit || $exit_signal;
32 243         3880 $self->{result}->{$id} = $data->{result};
33             }
34 102         2019 );
35 102         1652 $self->{jobs} = [];
36 102         416 $self->{callbacks} = [];
37              
38 102         1244 return $self;
39             }
40              
41             sub _pfork {
42 102     102   467 my ( $self, %opts ) = @_;
43              
44 102         270 my $cpu;
45 102 100       734 if ( defined $opts{max_process} ) {
46 19         171 $cpu = $opts{max_process};
47             }
48             else {
49 83   100     735 my $factor = $opts{max_process_per_cpu} || 1;
50 83         275 eval { $cpu = Sys::Info->new()->device('CPU')->count() * $factor; };
  83         1554  
51             }
52 102 100       843354 if ( defined $opts{max_memory} ) {
53 11         99 my $free_mem;
54 11         33 eval {
55 11         8921 require Sys::Statistics::Linux::MemStats;
56 11         8547 $free_mem = Sys::Statistics::Linux::MemStats->new->get->{realfree};
57             };
58 11         4499 my $max_mem = $opts{max_memory} * 1024; # 1024 **2 = 1 GO => expr in Kb
59 11         66 my $cpu_for_mem;
60 11 50       110 if ($@) {
61              
62             #warn "Cannot guess amount of available free memory need Sys::Statistics::Linux::MemStats\n";
63 0         0 $cpu_for_mem = 2;
64             }
65             else {
66 11         55 $cpu_for_mem = int( $free_mem / $max_mem );
67             }
68              
69             # min
70 11 50       55 $cpu = ( $cpu_for_mem < $cpu ) ? $cpu_for_mem : $cpu;
71             }
72 102   50     529 $cpu ||= 1;
73              
74 102         792 $self->{cpu} = $cpu;
75              
76             # we could also set a minimum amount of required memory
77 102         2085 $self->{pfork} = Parallel::ForkManager->new( int($cpu) );
78             $self->{pfork}->set_waitpid_blocking_sleep(0)
79 102 50       79427 unless $opts{waitpid_blocking_sleep};
80              
81 102         1041 return $self;
82             }
83              
84              
85             sub add {
86 435     435 1 65899 my ( $self, $code, $test ) = @_;
87              
88 435 50 33     2898 return unless $code && ref $code eq 'CODE';
89             push(
90 435         1906 @{ $self->{jobs} },
91 435         810 { name => ( scalar( @{ $self->{jobs} } ) + 1 ), code => $code }
  435         1806  
92             );
93 435         950 push( @{ $self->{callbacks} }, $test );
  435         867  
94              
95 435         1974 return $self;
96             }
97              
98              
99             sub total_jobs {
100 9     9 1 82 my ($self) = @_;
101              
102 9         70 return scalar @{ $self->{jobs} };
  9         75  
103             }
104              
105              
106             sub wait_for_all_optimized {
107 0     0 1 0 my ($self) = @_;
108              
109 0 0       0 return $self unless $self->total_jobs;
110 0         0 my @original_jobs = @{ $self->{jobs} };
  0         0  
111              
112             # callback not supported for now
113 0 0       0 if ( scalar @{ $self->{callbacks} } ) {
  0         0  
114             warn "Callback not supported in this mode for now.\n"
115 0 0       0 if grep { defined $_ } @{ $self->{callbacks} };
  0         0  
  0         0  
116 0         0 $self->{callbacks} = [];
117             }
118              
119 0 0       0 my $cpu = $self->{cpu} or die;
120 0         0 my $jobs_per_cpu = int( scalar @original_jobs / $cpu );
121 0 0 0     0 ++$jobs_per_cpu if scalar @original_jobs % $cpu || !$jobs_per_cpu;
122              
123 0         0 my @new_jobs;
124              
125             my $generate_sub = sub {
126 0     0   0 my ( $from, $to ) = @_;
127              
128             return sub {
129              
130             #print "subprocess from $from to $to\n";
131 0         0 for ( my $i = $from ; $i <= $to ; ++$i ) {
132              
133             #print "running job $i\n";
134 0         0 $original_jobs[$i]->{code}->();
135             }
136 0         0 return;
137 0         0 };
138 0         0 };
139              
140 0         0 my ( $from, $to ) = ( 0, 0 );
141 0         0 foreach my $id ( 1 .. $cpu ) {
142 0         0 $to = $from + $jobs_per_cpu - 1;
143 0 0       0 $to = scalar(@original_jobs) - 1 if $to >= scalar(@original_jobs);
144              
145             #print "FROM $from - TO $to\n";
146 0         0 my $sub = $generate_sub->( $from, $to );
147              
148 0         0 push @new_jobs, { name => $id, code => $sub };
149 0         0 $from = $to + 1;
150             }
151              
152 0         0 $self->{jobs} = \@new_jobs;
153              
154 0         0 return $self->wait_for_all();
155             }
156              
157              
158             sub run {
159 102     102 1 3101 my ($self) = @_;
160              
161 102 50       233 return unless scalar @{ $self->{jobs} };
  102         591  
162 102         317 my $pfm = $self->{pfork};
163 102         232 for my $job ( @{ $self->{jobs} } ) {
  102         396  
164 356 100       307094 $pfm->start( $job->{name} ) and next;
165 40         84488 my $job_result = $job->{code}();
166              
167             # can be used to stop on first error
168 40         14080046 my $job_error = 0;
169 40         2668 $pfm->finish( $job_error, { result => $job_result } );
170             }
171              
172             # wait for all jobs
173 62         115651 $pfm->wait_all_children;
174              
175 62         2914 return $self->{result};
176             }
177              
178              
179             sub wait_for_all {
180 27     27 1 12376 my ($self) = @_;
181              
182             # run callbacks
183 27 50       127 die "Cannot run callbacks" unless $self->run();
184              
185 9 50       224 return $self unless $self->total_jobs;
186 9         36 my $c = 0;
187              
188 9         143 my $results = $self->results();
189              
190 9         35 foreach my $callback ( @{ $self->{callbacks} } ) {
  9         97  
191 23 100       5040 next unless $callback;
192 17 50       102 die "cannot find result for #${c}" unless exists $results->[$c];
193 17         57 my $res = $results->[ $c++ ];
194              
195 17 50       175 if ( ref $callback eq 'HASH' ) {
    50          
196              
197             # internal mechanism
198             return
199 0 0 0     0 unless defined $callback->{test} && defined $callback->{args};
200              
201 0         0 my @args = ( $res, @{ $callback->{args} } );
  0         0  
202 0         0 my $t = $callback->{test};
203 0         0 my $str = join( ', ', map { "\$args[$_]" } ( 0 .. $#args ) );
  0         0  
204 0         0 eval "$t(" . $str . ")";
205             }
206             elsif ( ref $callback eq 'CODE' ) {
207              
208             # execute user function
209 17         107 $callback->($res);
210             }
211              
212             }
213              
214 9         15254 return $self;
215             }
216              
217              
218             sub results {
219 62     62 1 432 my ($self) = @_;
220              
221             my @sorted =
222 237         823 map { $self->{result}{$_} }
223 62         357 sort { int($a) <=> int($b) } keys %{ $self->{result} };
  295         1239  
  62         1522  
224 62         1801 return \@sorted;
225             }
226              
227             1;
228              
229             __END__