File Coverage

blib/lib/Proc/Queue.pm
Criterion Covered Total %
statement 161 227 70.9
branch 76 192 39.5
condition 27 66 40.9
subroutine 28 35 80.0
pod 15 19 78.9
total 307 539 56.9


line stmt bran cond sub pod time code
1             package Proc::Queue;
2              
3             require 5.006;
4              
5             our $VERSION = '1.23';
6              
7 48     48   43728 use strict;
  48         96  
  48         2160  
8             # use warnings;
9             require Exporter;
10 48     48   240 use Carp;
  48         48  
  48         5040  
11 48     48   54384 use POSIX ":sys_wait_h";
  48         403392  
  48         336  
12 48     48   97296 use Errno qw(ENOENT EPERM);
  48         57984  
  48         10560  
13              
14             our @ISA = qw(Exporter);
15             our %EXPORT_TAGS = ( all => [ qw( fork_now
16             weight
17             waitpids
18             run_back
19             run_back_now
20             system_back
21             system_back_now
22             all_exit_ok
23             running_now ) ] );
24              
25             our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } );
26             our @EXPORT = qw();
27              
28             # are we running on Windows?
29 48     48   288 use constant _win => $^O=~/win32/i;
  48         96  
  48         11952  
30              
31             # parameters
32             my $queue_size=4; # max number of councurrent processes running.
33             my $debug=0; # shows debug info.
34             my $trace=0; # shows function calls.
35             my $delay=0; # delay between fork calls.
36             my $ignore_children=0; # similar to $SIG{CHILD}='IGNORE';
37              
38             my $weight=1;
39             my $allow_excess=1;
40              
41             my $last=0; # last time fork was called.
42              
43             # module status
44             my $queue_now=0;
45             my %process;
46             my @captured;
47              
48             # set STDERR as unbuffered so all the carp calls work as expected
49             { my $oldfh=select STDERR; $|=1; select $oldfh }
50              
51             # extended import to support parameter configuration from use statment
52             sub import {
53 48     48   528 my ($pkg,@opts)=@_;
54 48         96 my $i;
55 48         336 for ($i=0; $i<@opts; $i++) {
56 96         240 my $o=$opts[$i];
57 96 50 66     1824 if( $o eq 'size'
      66        
      33        
      33        
      33        
      33        
      33        
58             or $o eq 'debug'
59             or $o eq 'trace'
60             or $o eq 'delay'
61             or $o eq 'weight'
62             or $o eq 'ignore_childs'
63             or $o eq 'ignore_children'
64             or $o eq 'allow_excess') {
65 48 50       240 $#opts>$i or croak "option '$o' needs a value";
66 48         192 my $value=$opts[$i+1];
67 48     48   240 { no strict qw( subs refs );
  48         48  
  48         61920  
  48         96  
68 48         288 &$o($value) }
69 48         288 splice @opts,$i--,2;
70             }
71             }
72 48 50       144 carp "Exporting '".join("', '",@opts)."' symbols from Proc::Queue" if $debug;
73 48         192 @_=($pkg,@opts);
74 48         8736 goto &Exporter::import;
75             }
76              
77             sub delay {
78 0 0   0 1 0 return $delay unless @_;
79 0         0 my $old_delay=$delay;
80 0         0 $delay=$_[0];
81 0 0       0 carp "Proc queue delay set to ${delay}s, it was $old_delay" if $debug;
82 0         0 $old_delay;
83             }
84              
85             sub size {
86 80 50   80 1 5173 return $queue_size unless @_;
87 80         160 my $size=shift;
88 80         112 my $old_size=$queue_size;
89 80 50       298 croak "invalid value for Proc::Queue size ($size), min value is 1"
90             unless $size >= 1;
91 80         112 $queue_size=$size;
92 80 50       277 carp "Proc queue size set to $size, it was $old_size" if $debug;
93 80         208 $old_size;
94             }
95              
96             sub weight {
97 21 50   21 1 399 return $weight unless @_;
98 21         105 my $old_weight=$weight;
99 21 50       273 croak "invalid value for Proc::Queue weight ($_[0]), min value is 1"
100             unless int($_[0])>=1;
101 21         147 $weight=int($_[0]);
102 21 50       336 carp "Proc weight set to $weight, it was $old_weight" if $debug;
103 21         126 $old_weight;
104             }
105              
106             sub debug {
107 0 0   0 1 0 return $debug unless @_;
108 0         0 my $old_debug=$debug;
109 0 0       0 if ($_[0]) {
110 0         0 $debug=1;
111 0         0 carp "debug mode ON";
112             }
113             else {
114 0         0 $debug=0;
115 0 0       0 carp "debug mode OFF" if $old_debug;
116             }
117 0         0 return $old_debug;
118             }
119              
120             sub trace {
121 0 0   0 1 0 return $trace unless @_;
122 0         0 my $old_trace=$trace;
123 0 0       0 if ($_[0]) {
124 0         0 $trace=1;
125 0 0       0 carp "trace mode ON" if $debug;
126             }
127             else {
128 0         0 $trace=0;
129 0 0       0 carp "trace mode OFF" if $debug;
130             }
131 0         0 return $old_trace;
132             }
133              
134             sub allow_excess {
135 11 50   11 1 187 return $allow_excess unless @_;
136 11         55 my $old_allow_excess=$allow_excess;
137 11 50       77 if ($_[0]) {
138 0         0 $allow_excess=1;
139 0 0       0 carp "allow_excess mode ON" if $debug;
140             }
141             else {
142 11         22 $allow_excess=0;
143 11 50       77 carp "allow_excess mode OFF" if $debug;
144             }
145 11         66 $old_allow_excess;
146             }
147              
148             sub ignore_children {
149 0 0   0 1 0 return $ignore_children unless @_;
150 0         0 my $old_ignore_children=$ignore_children;
151 0 0       0 if ($_[0]) {
152 0         0 $ignore_children=1;
153 0 0       0 carp "ignore_children mode ON" if $debug;
154             }
155             else {
156 0         0 $ignore_children=0;
157 0 0       0 carp "ignore_children mode OFF" if $debug;
158             }
159 0         0 return $old_ignore_children;
160             }
161              
162             *ignore_childs = \&ignore_children;
163              
164             # sub to store internally captured processes
165             sub _push_captured {
166 694 50   694   12156 push @captured, shift,$? unless $ignore_children;
167 694 50       49896 croak "captured stack is corrupted" if (@captured & 1)
168             }
169              
170             # do the real wait and housekeeping
171             sub _wait () {
172 735 50 33 735   3771 carp "Proc::Queue::_wait private function called" if $debug && $trace;
173 735 50       2615 carp "Waiting for child processes to exit" if $debug;
174 735         500499319 my $w=CORE::wait;
175 735 100       6276 if ($w != -1) {
176 692 50       9003 if(exists $process{$w}) {
177 692         6307 $queue_now -= delete($process{$w});
178 692 50       4732 carp "Process $w has exited, $queue_now processes running now" if $debug;
179             }
180             else {
181 0 0       0 carp "Unknow process $w has exited, ignoring it" if $debug;
182             }
183             }
184             else {
185 43 50       516 carp "No child processes left, continuing" if $debug;
186             }
187 735         7279 return $w;
188             }
189              
190             sub new_wait () {
191 258 50   258 0 4601 carp "Proc::Queue::wait called" if $trace;
192 258 100       2064 if(@captured) {
193 129         989 my $w=shift @captured;
194 129         817 $?=shift @captured;
195 129 50       989 carp "Wait returning old child $w captured in wait" if $debug;
196 129         344 return $w;
197             }
198 129         1118 return _wait;
199             }
200              
201             sub _waitpid ($$) {
202 1434     1434   8753 my ($pid,$flags)=@_;
203 1434 50 33     6215 carp "Proc::Queue::_waitpid($pid,$flags) private function called" if $debug && $trace;
204 1434 50       3864 carp "Waiting for child process $pid to exit" if $debug;
205 1434         335956923 my $w=CORE::waitpid($pid,$flags);
206 1434 50       4414 if ($w != -1) {
207 1434 100       9820 if(exists $process{$w}) {
208 349         3831 $queue_now -= delete($process{$w});
209 349 50       3887 carp "Process $w has exited, $queue_now processes running now" if $debug;
210             }
211             else {
212 1085 50       2724 carp "Unknow process $w has exited, ignoring it" if $debug;
213             }
214             }
215             else {
216 0 0       0 carp "No child processes left, continuing" if $debug;
217             }
218 1434         6941 return $w;
219             }
220              
221             sub _clean() {
222 1085     1085   1806 my $pid;
223 1085         1716 while (1) {
224 1173         30987 $pid=_waitpid(-1,WNOHANG);
225 1173 100 33     49186 return unless ((_win && $pid < -1) || $pid >0);
      66        
226 88         1408 _push_captured $pid
227             }
228             }
229              
230             sub new_waitpid ($$) {
231 463     463 0 2801 my ($pid,$flags)=@_;
232 463 50       945 carp "Proc::Queue::waitpid called" if $trace;
233 463         2781 foreach my $i (0..$#captured) {
234 9674 100       19955 next if $i&1;
235 4938         4367 my $r;
236 4938 100 66     20895 if ($pid==$captured[$i] or $pid==-1) {
237 202 50       532 croak "corrupted captured stack" unless ($#captured & 1);
238 202         1778 ($r,$?)=splice @captured,$i,2;
239 202         1393 return $r;
240             }
241             }
242 261         1514 return _waitpid($pid,$flags);
243             }
244              
245             sub new_exit (;$ ) {
246 47 50   47 0 56063453 my $e=@_?shift:0;
247 47 0       327841 carp "Proc::Queue::exit(".(defined($e)?$e:'undef').") called" if $trace;
    50          
248 47 0       1179 carp "Process $$ exiting with value ".(defined($e)?$e:'undef') if $debug;
    50          
249 47         0 return CORE::exit($e);
250             }
251              
252             # use Time::Hires::time if available;
253 48     48   2640 BEGIN { eval "use Time::HiRes 'time'" }
  48     48   43488  
  48         95616  
  48         192  
254              
255             sub _fork () {
256 1175 50 33 1175   6758 carp "Proc::Queue::_fork called" if $trace && $debug;
257 1175 50       3629 if ($delay>0) {
258 0         0 my $wait=$last+$delay - time;
259 0 0       0 if ($wait>0) {
260 0 0       0 carp "Delaying $wait seconds before forking" if $debug;
261 0         0 select (undef,undef,undef,$wait);
262             }
263 0         0 $last=time;
264             }
265 1175         1569650 my $f=CORE::fork;
266 1175 50       86946 if (defined($f)) {
267 1175 100       5359 if($f == 0) {
268 47 50       4862 carp "Process $$ now running" if $debug;
269             # reset queue internal vars in child proccess;
270 47         1002 $queue_size=1;
271 47         501 $queue_now=0;
272 47         3895 %process=();
273 47         4384 @captured=();
274             }
275             else {
276 1128         84775 $process{$f}=$weight;
277 1128         8341 $queue_now+=$weight;
278 1128 50       4772 carp "Child forked (pid=$f), $queue_now processes running now" if $debug;
279             }
280             }
281             else {
282 0 0       0 carp "Fork failed: $!" if $debug;
283             }
284 1175         85565 return $f;
285             }
286              
287             sub new_fork () {
288 767 50   767 0 14759 carp "Proc::Queue::fork called" if $trace;
289 767 100 100     9522 while($queue_now and
290             $queue_now + ($allow_excess ? 1 : $weight) > $queue_size) {
291 606 50       2911 carp "Waiting for some process to finish" if $debug;
292 606         974 my $nw;
293 606 50       4722 if (($nw=_wait) != -1) {
294 606         6047 _push_captured $nw;
295             }
296             else {
297 0         0 carp "Proc queue seems to be corrupted, $queue_now children lost";
298 0         0 last;
299             }
300             }
301 767         3147 return _fork;
302             }
303              
304             sub fork_now () {
305 408 50   408 1 21270 carp "Proc::Queue::fork_now called" if $trace;
306 408         1006 return _fork;
307             }
308              
309             sub waitpids {
310 84 50   84 1 756 carp "Proc::Queue::waitpids(".join(", ",@_).")" if $trace;
311 84         1653 my @result;
312 84         1041 foreach my $pid (@_) {
313 442 50       1439 if (defined($pid)) {
314 442 50       1297 carp "Waiting for child $pid to exit" if $debug;
315 442         1394 my $r=new_waitpid($pid,0);
316 442 50 33     7196 if ((_win && $r < -1) || $r > 0) {
      33        
317 442 50       1480 carp "Child $r return $?" if $debug;
318 442         3881 push @result,$r,$?;
319             }
320             else {
321 0 0       0 carp "No such child returned while waiting for $pid" if $debug;
322 0         0 push @result,$r,undef;
323             }
324             }
325             else {
326 0         0 carp "Undef arg found";
327 0         0 push @result,undef,undef
328             };
329             }
330 84         1166 return @result;
331             }
332              
333              
334             sub _run_back {
335 538     538   1125 my ($now,$code)=@_;
336 538 50 33     2452 carp "Proc::Queue::_run_back($now,$code) called" if $trace and $debug;
337 538 100       4054 my $f=$now ? fork_now : new_fork;
338 538 100 66     12876 if(defined($f) and $f==0) {
339 31 50       477 carp "Running code $code in forked child $$" if $debug;
340 31         824 $?=0;
341 31         550 eval {
342 31         1274 &$code();
343             };
344 0 0       0 if ($@) {
345 0 0       0 carp "Uncaught exception $@" if $debug;
346 0         0 new_exit(255)
347             }
348             else {
349 0 0 0     0 carp "Code $code in child $$ returns '$?'" if $? && $debug;
350             }
351 0         0 new_exit($?)
352             }
353 507         21393 return $f;
354             }
355              
356             sub run_back (&) {
357 515     515 1 36701 my $code=shift;
358 515 50       1829 carp "Proc::Queue::run_back($code) called" if $trace;
359 515         1441 return _run_back(0,$code)
360             }
361              
362             sub run_back_now (&) {
363 23     23 1 1679 my $code=shift;
364 23 50       552 carp "Proc::Queue::run_back_now($code) called" if $trace;
365 23         483 return _run_back(1,$code)
366             }
367              
368             sub _system_back {
369 0     0   0 my $now=shift;
370 0 0 0     0 carp "Proc::Queue::_system_back($now, ".join(", ",@_).") called" if $trace and $debug;
371 0         0 if (0 and @_ > 1) { # TODO, search command on the PATH
372             unless (-e $_[0]) {
373             carp "command '$_[0]' not found" if $debug;
374             $! = ENOENT;
375             return undef;
376             }
377             unless (-f _ and -x _) {
378             carp "permission to execute command '$_[0]' denied" if $debug;
379             $! = EPERM;
380             return undef;
381             }
382             }
383 0 0       0 my $f=$now ? fork_now : new_fork;
384 0 0 0     0 if(defined($f) and $f==0) {
385 0 0       0 carp "Running exec(".join(", ",@_).") in forked child $$" if $debug;
386 0         0 { exec(@_) }
  0         0  
387 0 0       0 carp "exec(".join(", ",@_).") failed" if $debug;
388 0         0 require POSIX;
389 0         0 POSIX::_exit(255);
390             }
391 0         0 return $f;
392             }
393              
394             sub system_back {
395 0 0   0 1 0 carp "Proc::Queue::system_back(".join(", ",@_).") called" if $trace;
396 0         0 return _system_back(0,@_);
397             }
398              
399             sub system_back_now {
400 0 0   0 1 0 carp "Proc::Queue::system_back_now(".join(", ",@_).") called" if $trace;
401 0         0 return _system_back(1,@_);
402             }
403              
404             sub all_exit_ok {
405 84 50   84 1 5667 carp "Proc::Queue::all_exit_ok(".join(", ",@_).")" if $trace;
406 84         914 my @result=waitpids(@_);
407 84         527 my $i;
408 84         459 for($i=0; $i<@result; $i++) {
409 884 100       3951 next unless $i&1;
410 442 100 66     4980 if (!defined($result[$i]) or $result[$i]) {
411 22 50       352 carp "Child ".$_[$i>>1]." fail with code $result[$i], waitpid return $result[$i-1]" if $debug;
412 22         330 return 0;
413             }
414             }
415 62 50       247 carp "All children run ok" if $debug;
416 62         618 return 1;
417             }
418              
419             # this function is mostly for testing pourposes:
420             sub running_now () {
421 1085     1085 1 82183 _clean;
422 1085         2413 return $queue_now;
423             }
424              
425             *CORE::GLOBAL::wait = \&new_wait;
426             *CORE::GLOBAL::waitpid = \&new_waitpid;
427             *CORE::GLOBAL::exit = \&new_exit;
428             *CORE::GLOBAL::fork = \&new_fork;
429              
430              
431             1;
432              
433             __END__