File Coverage

blib/lib/MCE/Child.pm
Criterion Covered Total %
statement 377 521 72.3
branch 179 406 44.0
condition 51 161 31.6
subroutine 55 74 74.3
pod 23 23 100.0
total 685 1185 57.8


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## A threads-like parallelization module compatible with Perl 5.8.
4             ##
5             ###############################################################################
6              
7 12     12   899802 use strict;
  12         154  
  12         366  
8 12     12   60 use warnings;
  12         12  
  12         354  
9              
10 12     12   58 no warnings qw( threads recursion uninitialized once redefine );
  12         26  
  12         791  
11              
12             package MCE::Child;
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
18             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
19             ## no critic (TestingAndDebugging::ProhibitNoStrict)
20              
21 12     12   5272 use MCE::Signal ();
  12         24  
  12         415  
22 12     12   5557 use MCE::Mutex ();
  12         24  
  12         236  
23 12     12   5531 use MCE::Channel ();
  12         57  
  12         350  
24 12     12   94 use Time::HiRes 'sleep';
  12         23  
  12         71  
25              
26             use overload (
27             q(==) => \&equal,
28 0     0   0 q(!=) => sub { !equal(@_) },
29 12         131 fallback => 1
30 12     12   16408 );
  12         11600  
31              
32             sub import {
33 12 50   12   152 if (caller !~ /^MCE::/) {
34 12     12   1360 no strict 'refs'; no warnings 'redefine';
  12     12   23  
  12         521  
  12         114  
  12         67  
  12         1218  
35 12         13 *{ caller().'::mce_child' } = \&mce_child;
  12         81  
36             }
37 12         163 return;
38             }
39              
40             ## The POSIX module has many symbols. Try not loading it simply
41             ## to have WNOHANG. The following covers most platforms.
42              
43             use constant {
44 12 50       59553 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
45             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
46 12     12   71 };
  12         24  
47              
48             my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} );
49              
50             my $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
51             my $_tid = ( $INC{'threads.pm'} ) ? threads->tid() : 0;
52              
53             sub CLONE {
54 0 0   0   0 $_tid = threads->tid(), &_clear() if $INC{'threads.pm'};
55             }
56              
57             sub _clear {
58 0     0   0 %{ $_LIST } = ();
  0         0  
59             }
60              
61             sub _max_workers {
62 10     10   17 my ( $cpus ) = @_;
63 10 100       49 if ( $cpus eq 'auto' ) {
    100          
64 1         4 $cpus = MCE::Util::get_ncpu();
65             }
66             elsif ( $cpus =~ /^([0-9.]+)%$/ ) {
67 6         25 my ( $percent, $ncpu ) = ( $1 / 100, MCE::Util::get_ncpu() );
68 6         24 $cpus = $ncpu * $percent + 0.5;
69             }
70 10 100 66     80 $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
71 10         21 return int($cpus);
72             }
73              
74             ###############################################################################
75             ## ----------------------------------------------------------------------------
76             ## Init routine.
77             ##
78             ###############################################################################
79              
80             bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__;
81              
82             sub init {
83 19 100 66 19 1 4068 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
84              
85             # -- options ----------------------------------------------------------
86             # max_workers child_timeout posix_exit on_start on_finish void_context
87             # ---------------------------------------------------------------------
88              
89 19 100       220 my $pkg = "$$.$_tid.".( caller eq __PACKAGE__ ? caller(1) : caller );
90 19 50       233 my $mngd = $_MNGD->{$pkg} = ( ref $_[0] eq 'HASH' ) ? shift : { @_ };
91              
92 19         41 @_ = ();
93              
94             $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg,
95 19         132 $mngd->{WRK_ID} = $$;
96              
97 19 100       91 &_force_reap($pkg), $_DATA->{$pkg}->clear() if ( exists $_LIST->{$pkg} );
98              
99 19 100       73 if ( !exists $_LIST->{$pkg} ) {
100 12 0 33     36 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
101 12 50       57 sleep 0.015 if $_tid;
102              
103             # Start the shared-manager process if not running.
104 12 50       58 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
105              
106 12         70 my $chnl = MCE::Channel->new( impl => 'Mutex' );
107 12         117 $_LIST->{ $pkg } = MCE::Child::_ordhash->new();
108 12         93 $_DELY->{ $pkg } = MCE::Child::_delay->new( $chnl );
109 12         93 $_DATA->{ $pkg } = MCE::Child::_hash->new( $chnl );
110 12         480 $_DATA->{"$pkg:seed"} = int(rand() * 1e9);
111 12         48 $_DATA->{"$pkg:id" } = 0;
112              
113 12 0 33     47 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
114             }
115              
116 19 50       55 if ( !exists $mngd->{posix_exit} ) {
117             $mngd->{posix_exit} = 1 if (
118             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
119             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
120             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
121             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
122 19 50 33     1188 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
123             );
124             }
125              
126 19 100       82 if ( defined $mngd->{max_workers} ) {
127 3         7 $mngd->{max_workers} = _max_workers($mngd->{max_workers});
128             }
129              
130 19 50 33     87 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
131 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
132             }
133              
134             require POSIX
135 19 50 66     5256 if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 );
      66        
136              
137 19         40059 return;
138             }
139              
140             ###############################################################################
141             ## ----------------------------------------------------------------------------
142             ## 'new', 'mce_child', and 'create' for threads-like similarity.
143             ##
144             ###############################################################################
145              
146             ## 'new' and 'tid' are aliases for 'create' and 'pid' respectively.
147              
148             *new = \&create, *tid = \&pid;
149              
150             ## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2)
151             ## Tip found in threads::async.
152              
153             sub mce_child (&;@) {
154 0     0 1 0 goto &create;
155             }
156              
157             sub create {
158 65   66 65 1 23180 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
159             # construct mngd internally on first use unless defined
160             init(); $_MNGD->{ "$$.$_tid.".caller() };
161             };
162              
163 65 50       475 shift if ( $_[0] eq __PACKAGE__ );
164              
165             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
166              
167 65 50       813 my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__;
  0         0  
168              
169 65 50       712 $self->{IGNORE} = 1 if $SIG{CHLD} eq 'IGNORE';
170 65         764 $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG};
171 65 50 33     686 $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' );
172              
173 65 0 33     152 my $func = shift; $func = caller().'::'.$func
  65   33     410  
174             if ( !ref $func && length $func && index($func,':') < 0 );
175              
176 65 50       402 if ( !defined $func ) {
177 0         0 local $\; print {*STDERR} "code function is not specified or valid\n";
  0         0  
  0         0  
178 0         0 return undef;
179             }
180              
181             my ( $list, $max_workers, $pkg ) = (
182             $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG}
183 65         289 );
184              
185 65 50       515 $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) >= 2e9 );
186              
187             {
188             # Reap completed child processes.
189 65         135 local ($SIG{CHLD}, $!, $?, $_); map {
  65         2793  
190 2         2850624 waitpid($_, 0); _reap_child($list->del($_), 0); ();
  2         76  
  2         78  
191             }
192 65         842 $_DATA->{$pkg}->reap_data;
193             }
194              
195 65 50 33     951 if ( $max_workers || $self->{IGNORE} ) {
196             # Wait for a slot if saturated.
197 0 0 0     0 if ( $max_workers && $list->len() >= $max_workers ) {
198 0         0 my $count = $list->len() - $max_workers + 1;
199 0         0 _wait_one($pkg) for 1 .. $count;
200             }
201             }
202              
203             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
204              
205 65 0 33     237 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
206              
207 65         160 my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages
  65         123  
208 65         126 my ( $killed, $pid );
209              
210             {
211 65     0   110 local $SIG{TERM} = local $SIG{INT} = sub { $killed = $_[0] }
  0         0  
212 65 50 33     2763 if ( !$_is_MSWin32 && $] ge '5.010001' );
213              
214             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
215 65 50       2518 if ( !$_is_MSWin32 );
216              
217 65         64330 $pid = fork();
218              
219 65 50       3751 if ( !defined $pid ) { # error
    100          
220 0         0 local $\; print {*STDERR} "fork error: $!\n";
  0         0  
  0         0  
221             }
222             elsif ( $pid ) { # parent
223 55         1807 $self->{WRK_ID} = $pid;
224 55         2627 $list->set($pid, $self);
225 55 100       7029 $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start};
226             }
227             else { # child
228 10         663 %{ $_LIST } = (), $_SELF = $self;
  10         1490  
229              
230             local $SIG{TERM} = local $SIG{INT} = local $SIG{ABRT} = \&_trap,
231             local $SIG{SEGV} = local $SIG{HUP} = \&_trap,
232 10         1937 local $SIG{QUIT} = \&_quit;
233 10         339 local $SIG{CHLD};
234              
235 10 50       395 MCE::Shared::init() if $INC{'MCE/Shared.pm'};
236 10 50       1030 $_DATA->{ $_SELF->{PKG} }->set('S'.$$, '') unless $self->{IGNORE};
237 10 50       80 CORE::kill($killed, $$) if $killed;
238              
239             # Sets the seed of the base generator uniquely between workers.
240             # The new seed is computed using the current seed and ID value.
241             # One may set the seed at the application level for predictable
242             # results. Ditto for Math::Prime::Util, Math::Random, and
243             # Math::Random::MT::Auto.
244              
245 10         221 srand( abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560 );
246              
247 10 50       150 if ( $INC{'Math/Prime/Util.pm'} ) {
248             Math::Prime::Util::srand(
249 0         0 abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560
250             );
251             }
252              
253 10 50       125 if ( $INC{'Math/Random.pm'} ) {
254 0         0 my $cur_seed = Math::Random::random_get_seed();
255 0 0       0 my $new_seed = ($cur_seed < 1073741781)
256             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
257             : $cur_seed - ((abs($id) * 10000) % 1073741780);
258              
259 0         0 Math::Random::random_set_seed($new_seed, $new_seed);
260             }
261              
262 10 50       86 if ( $INC{'Math/Random/MT/Auto.pm'} ) {
263 0         0 my $cur_seed = Math::Random::MT::Auto::get_seed()->[0];
264 0 0       0 my $new_seed = ($cur_seed < 1073741781)
265             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
266             : $cur_seed - ((abs($id) * 10000) % 1073741780);
267              
268 0         0 Math::Random::MT::Auto::set_seed($new_seed);
269             }
270              
271 10         496 _dispatch($mngd, $func, \@args);
272             }
273             }
274              
275 55 0 33     2087 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
276              
277 55 50       303 CORE::kill($killed, $$) if $killed;
278              
279 55 50       5468 return $pid ? $self : undef;
280             }
281              
282             ###############################################################################
283             ## ----------------------------------------------------------------------------
284             ## Public methods.
285             ##
286             ###############################################################################
287              
288             sub equal {
289 0 0 0 0 1 0 return 0 unless ( ref $_[0] && ref $_[1] );
290 0 0       0 $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0;
291             }
292              
293             sub error {
294 31 50   31 1 423 _croak('Usage: $child->error()') unless ref( my $self = $_[0] );
295 31 50       169 $self->join() unless $self->{REAPED};
296 31 50       310 $self->{ERROR} || undef;
297             }
298              
299             sub exit {
300 10 50 33 10 1 505 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
301              
302 10 50       165 my ( $self ) = ( ref $_[0] ? shift : $_SELF );
303 10         100 my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} );
304              
305 10 50 33     240 if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) {
    50          
306 0         0 MCE::Child->finish('MCE'); CORE::exit(@_);
  0         0  
307             }
308             elsif ( $wrk_id == $$ ) {
309 0   0     0 alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0;
  0         0  
  0         0  
310 0 0       0 $_DATA->{$pkg}->set('R'.$wrk_id, @res ? \@res : '');
311 0         0 die "Child exited ($?)\n";
312 0         0 _exit($?); # not reached
313             }
314              
315 10 50       210 return $self if $self->{REAPED};
316              
317 10 50       95 if ( exists $_DATA->{$pkg} ) {
318 10         165 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
319             } else {
320 0         0 sleep 0.030;
321             }
322              
323 10 50       75 if ($_is_MSWin32) {
324 0 0       0 CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id);
325             } else {
326 10 50       515 CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
327             }
328              
329 10         105 $self;
330             }
331              
332             sub finish {
333 26 50   26 1 3425 _croak('Usage: MCE::Child->finish()') if ref($_[0]);
334 26 50 33     257 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
335              
336 26 100       293 my $pkg = defined($_[0]) ? $_[0] : caller();
337              
338 26 100       122 if ( $pkg eq 'MCE' ) {
    100          
339 23         71 for my $key ( keys %{ $_LIST } ) { MCE::Child->finish($key); }
  23         194  
  2         19  
340             }
341             elsif ( exists $_LIST->{$pkg} ) {
342 2 50       8 return if $MCE::Signal::KILLED;
343              
344 2 50       11 if ( exists $_DELY->{$pkg} ) {
345 2         14 &_force_reap($pkg);
346             delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}),
347             delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}),
348 2         146 delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg });
349             }
350             }
351              
352 26         77 @_ = ();
353              
354 26         1828 return;
355             }
356              
357             sub is_joinable {
358 24 50   24 1 96 _croak('Usage: $child->is_joinable()') unless ref( my $self = $_[0] );
359 24         72 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
360              
361 24 50       128 if ( $wrk_id == $$ ) {
    50          
362 0         0 '';
363             }
364             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
365 24 50       56 return '' if $self->{REAPED};
366 24         80 local $!; $_DATA->{$pkg}->reap_data;
  24         72  
367 24 50       392 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do {
368 0 0       0 _reap_child($self, 0) unless $self->{REAPED};
369 0         0 1;
370             };
371             }
372             else {
373             # limitation for MCE::Child only; allowed for MCE::Hobo
374 0         0 _croak('Error: $child->is_joinable() not called by managed process');
375             }
376             }
377              
378             sub is_running {
379 24 50   24 1 10776 _croak('Usage: $child->is_running()') unless ref( my $self = $_[0] );
380 24         80 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
381              
382 24 50       144 if ( $wrk_id == $$ ) {
    50          
383 0         0 1;
384             }
385             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
386 24 50       80 return '' if $self->{REAPED};
387 24         72 local $!; $_DATA->{$pkg}->reap_data;
  24         104  
388 24 50       440 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do {
389 0 0       0 _reap_child($self, 0) unless $self->{REAPED};
390 0         0 '';
391             };
392             }
393             else {
394             # limitation for MCE::Child only; allowed for MCE::Hobo
395 0         0 _croak('Error: $child->is_running() not called by managed process');
396             }
397             }
398              
399             sub join {
400 41 50   41 1 22776 _croak('Usage: $child->join()') unless ref( my $self = $_[0] );
401 41         176 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
402              
403 41 50       109 if ( $self->{REAPED} ) {
404 0 0       0 _croak('Child already joined') unless exists( $self->{RESULT} );
405 0 0       0 $_LIST->{$pkg}->del($wrk_id) if ( exists $_LIST->{$pkg} );
406              
407             return ( defined wantarray )
408 0 0       0 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 0       0  
409             : ();
410             }
411              
412 41 50       352 if ( $wrk_id == $$ ) {
    50          
413 0         0 _croak('Cannot join self');
414             }
415             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
416             # remove from list after reaping
417 41         651 local $SIG{CHLD};
418 41         443 _reap_child($self, 1);
419 41         219 $_LIST->{$pkg}->del($wrk_id);
420             }
421             else {
422             # limitation for MCE::Child only; allowed for MCE::Hobo
423 0         0 _croak('Error: $child->join() not called by managed process');
424             }
425              
426 41 50       162 return unless ( exists $self->{RESULT} );
427              
428             ( defined wantarray )
429 41 50       508 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 100       0  
430             : ();
431             }
432              
433             sub kill {
434 5 50   5 1 155 _croak('Usage: $child->kill()') unless ref( my $self = $_[0] );
435 5         65 my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] );
436              
437 5 50       30 if ( $wrk_id == $$ ) {
438 0   0     0 CORE::kill($signal || 'INT', $$);
439 0         0 return $self;
440             }
441 5 50       55 if ( $self->{MGR_ID} eq "$$.$_tid" ) {
442 5 50       20 return $self if $self->{REAPED};
443 5 50       20 if ( exists $_DATA->{$pkg} ) {
444 5         25 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
445             } else {
446 0         0 sleep 0.030;
447             }
448             }
449              
450 5 50 50     225 CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
451              
452 5         30 $self;
453             }
454              
455             sub list {
456 8 50   8 1 5688 _croak('Usage: MCE::Child->list()') if ref($_[0]);
457 8         88 my $pkg = "$$.$_tid.".caller();
458              
459 8 50       48 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : ();
460             }
461              
462             sub list_pids {
463 8 50   8 1 408 _croak('Usage: MCE::Child->list_pids()') if ref($_[0]);
464 8         264 my $pkg = "$$.$_tid.".caller(); local $_;
  8         72  
465              
466 8 50       304 ( exists $_LIST->{$pkg} ) ? map { $_->pid } $_LIST->{$pkg}->vals() : ();
  24         152  
467             }
468              
469             sub list_joinable {
470 8 50   8 1 6104 _croak('Usage: MCE::Child->list_joinable()') if ref($_[0]);
471 8         56 my $pkg = "$$.$_tid.".caller();
472              
473 8 50       104 return () unless ( my $list = $_LIST->{$pkg} );
474 8         48 local ($!, $?, $_);
475              
476 8         536 $_DATA->{$pkg}->reap_data;
477              
478             map {
479 8 50       48 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do {
  24         280  
480 0 0       0 _reap_child($_, 0) unless $_->{REAPED};
481 0         0 $_;
482             };
483             }
484             $list->vals();
485             }
486              
487             sub list_running {
488 8 50   8 1 16496 _croak('Usage: MCE::Child->list_running()') if ref($_[0]);
489 8         56 my $pkg = "$$.$_tid.".caller();
490              
491 8 50       88 return () unless ( my $list = $_LIST->{$pkg} );
492 8         136 local ($!, $?, $_);
493              
494 8         184 $_DATA->{$pkg}->reap_data;
495              
496             map {
497 8 50       112 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do {
  24         312  
498 0 0       0 _reap_child($_, 0) unless $_->{REAPED};
499 0         0 ();
500             };
501             }
502             $list->vals();
503             }
504              
505             sub max_workers {
506 17 50   17 1 51 _croak('Usage: MCE::Child->max_workers()') if ref($_[0]);
507 17   33     64 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
508             # construct mngd internally on first use unless defined
509             init(); $_MNGD->{ "$$.$_tid.".caller() };
510             };
511 17 50       30 shift if ( $_[0] eq __PACKAGE__ );
512              
513 17 100       28 $mngd->{max_workers} = _max_workers(shift) if @_;
514 17         50 $mngd->{max_workers};
515             }
516              
517             sub pending {
518 8 50   8 1 5384 _croak('Usage: MCE::Child->pending()') if ref($_[0]);
519 8         48 my $pkg = "$$.$_tid.".caller();
520              
521 8 50       48 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0;
522             }
523              
524             sub pid {
525 31 50   31 1 244 ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID};
526             }
527              
528             sub result {
529 7 50   7 1 139 _croak('Usage: $child->result()') unless ref( my $self = $_[0] );
530 7 50       30 return $self->join() unless $self->{REAPED};
531              
532 7 50       50 _croak('Child already joined') unless exists( $self->{RESULT} );
533 7 50       55 wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1];
  0         0  
534             }
535              
536             sub self {
537 0 0   0 1 0 ref($_[0]) ? $_[0] : $_SELF;
538             }
539              
540             sub wait_all {
541 1 50   1 1 62 _croak('Usage: MCE::Child->wait_all()') if ref($_[0]);
542 1         55 my $pkg = "$$.$_tid.".caller();
543              
544             return wantarray ? () : 0
545 1 0 33     53 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
    50          
546              
547 1         14 local $_; ( wantarray )
548 0         0 ? map { $_->join(); $_ } $_LIST->{$pkg}->vals()
  0         0  
549 1 50       152 : map { $_->join(); () } $_LIST->{$pkg}->vals();
  2         36  
  2         10  
550             }
551              
552             *waitall = \&wait_all; # compatibility
553              
554             sub wait_one {
555 4 50   4 1 252 _croak('Usage: MCE::Child->wait_one()') if ref($_[0]);
556 4         168 my $pkg = "$$.$_tid.".caller();
557              
558             return undef
559 4 50 33     204 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
560              
561 4         144 _wait_one($pkg);
562             }
563              
564             *waitone = \&wait_one; # compatibility
565              
566             sub yield {
567 0 0   0 1 0 _croak('Usage: MCE::Child->yield()') if ref($_[0]);
568 0 0 0     0 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
569              
570 0   0     0 my $pkg = $_SELF->{PKG} || do {
571             my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
572             # construct mngd internally on first use unless defined
573             init(); $_MNGD->{ "$$.$_tid.".caller() };
574             };
575             $mngd->{PKG};
576             };
577              
578 0 0       0 return unless $_DELY->{$pkg};
579 0         0 my $seconds = $_DELY->{$pkg}->seconds(@_);
580              
581 0         0 MCE::Util::_sleep( $seconds );
582             }
583              
584             ###############################################################################
585             ## ----------------------------------------------------------------------------
586             ## Private methods.
587             ##
588             ###############################################################################
589              
590             sub _croak {
591 0 0   0   0 if ( $INC{'MCE.pm'} ) {
592 0         0 goto &MCE::_croak;
593             }
594             else {
595 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
596 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
597              
598 0         0 $\ = undef; goto &Carp::croak;
  0         0  
599             }
600             }
601              
602             sub _dispatch {
603 10     10   152 my ( $mngd, $func, $args ) = @_;
604              
605 10         260 $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$, $? = 0;
606 10 50       150 $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32;
607              
608             {
609 10         94 local $!;
  10         223  
610 10 50       719 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
611 9 50       2297 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
612             }
613              
614             # Run task.
615             my $child_timeout = ( exists $_SELF->{child_timeout} )
616 9 50       394 ? $_SELF->{child_timeout} : $mngd->{child_timeout};
617              
618             my $void_context = ( exists $_SELF->{void_context} )
619 9 50       52 ? $_SELF->{void_context} : $mngd->{void_context};
620              
621 9         35 my @res; my $timed_out = 0;
  9         34  
622              
623             local $SIG{'ALRM'} = sub {
624 0     0   0 alarm 0; $timed_out = 1; $SIG{__WARN__} = sub {};
  0         0  
  0         0  
625 0         0 die "Child timed out\n";
626 9         529 };
627              
628 9 50 33     337 if ( $void_context || $_SELF->{IGNORE} ) {
629 12     12   96 no strict 'refs';
  12         23  
  12         900  
630 0   0     0 eval { alarm($child_timeout || 0); $func->(@{ $args }) };
  0         0  
  0         0  
  0         0  
631             }
632             else {
633 12     12   70 no strict 'refs';
  12         474  
  12         2348  
634 9   50     93 @res = eval { alarm($child_timeout || 0); $func->(@{ $args }) };
  9         869  
  9         77  
  8         112  
635             }
636              
637 7         121 alarm 0;
638 7 50       66 $@ = "Child timed out" if $timed_out;
639              
640 7 50       69 if ( $@ ) {
641 0 0       0 _exit($?) if ( $@ =~ /^Child exited \(\S+\)$/ );
642 0         0 my $err = $@; $? = 1; $err =~ s/, <__ANONIO__> line \d+//;
  0         0  
  0         0  
643              
644 0 0       0 if ( ! $_SELF->{IGNORE} ) {
645             $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err),
646 0         0 $_DATA->{ $_SELF->{PKG} }->set('R'.$$, '');
647             }
648              
649 0 0 0     0 if ( !$timed_out && !$mngd->{on_finish} && !$INC{'MCE/Simple.pm'} ) {
      0        
650 12     12   7528 use bytes; warn "Child $$ terminated abnormally: reason $err\n";
  12         178  
  12         60  
  0         0  
651             }
652             }
653             else {
654 7 50       52 shift(@res) if ref($res[0]) =~ /^MCE::(?:Barrier|Semaphore)::_guard/s;
655             $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? \@res : '')
656 7 50       271 if ( ! $_SELF->{IGNORE} );
    50          
657             }
658              
659 7         108 _exit($?);
660             }
661              
662             sub _exit {
663 10     10   72 my ( $exit_status ) = @_;
664              
665             # Check for nested workers not yet joined.
666 10 50 66     156 MCE::Child->finish('MCE') if ( !$_SELF->{SIGNALED} && keys %{ $_LIST } );
  7         74  
667              
668             # Exit child process.
669 10 50   0   333 $SIG{__DIE__} = sub {} unless $_tid;
670 10     0   262 $SIG{__WARN__} = sub {};
671              
672 10 50 33     130 threads->exit($exit_status) if ( $INC{'threads.pm'} && $_is_MSWin32 );
673              
674             my $posix_exit = ( exists $_SELF->{posix_exit} )
675 10 50       109 ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit};
676              
677 10 0 33     118 if ( $posix_exit && !$_SELF->{SIGNALED} && !$_is_MSWin32 ) {
      33        
678 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
679 0 0       0 POSIX::_exit($exit_status) if $INC{'POSIX.pm'};
680 0         0 CORE::kill('KILL', $$);
681             }
682              
683 10         4565 CORE::exit($exit_status);
684             }
685              
686             sub _force_reap {
687 9     9   39 my ( $count, $pkg ) = ( 0, @_ );
688 9 50 33     96 return unless ( exists $_LIST->{$pkg} && $_LIST->{$pkg}->len() );
689              
690 0         0 for my $child ( $_LIST->{$pkg}->vals() ) {
691 0 0       0 next if $child->{IGNORE};
692              
693 0 0       0 if ( $child->is_running() ) {
694 0 0       0 sleep(0.015), CORE::kill('KILL', $child->pid())
695             if CORE::kill('ZERO', $child->pid());
696 0         0 $count++;
697             }
698             }
699              
700 0         0 $_LIST->{$pkg}->clear();
701              
702 0 0 0     0 warn "Finished with active child processes [$pkg] ($count)\n"
703             if ( $count && !$_is_MSWin32 );
704              
705 0         0 return;
706             }
707              
708             sub _quit {
709 3 50   3   2844 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
710              
711 3         25 alarm 0; my ( $name ) = @_;
  3         20  
712 3         100 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
713              
714       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
715 3 50       161 if ( exists $SIG{$name} );
716              
717 3 50       19 if ( ! $_SELF->{IGNORE} ) {
718 3         28 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
719 3         28 $_DATA->{$pkg}->set('R'.$wrk_id, '');
720             }
721              
722 3         23 _exit(0);
723             }
724              
725             sub _reap_child {
726 47     47   222 my ( $child, $wait_flag ) = @_;
727 47 50       209 return unless $child;
728              
729 47         371 local @_ = $_DATA->{ $child->{PKG} }->get( $child->{WRK_ID}, $wait_flag );
730              
731 47 100 50     1373 ( $child->{ERROR}, $child->{RESULT}, $child->{REAPED} ) =
732             ( pop || '', length $_[0] ? pop : [], 1 );
733              
734 47 50       252 return if $child->{IGNORE};
735              
736 47   50     640 my ( $exit, $err ) = ( $? || 0, $child->{ERROR} );
737 47         185 my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f );
738              
739 47 50 33     184 if ( $code > 100 && !$err ) {
740 0 0       0 $code = 2, $sig = 1, $err = 'Child received SIGHUP' if $code == 101;
741 0 0       0 $code = 2, $sig = 2, $err = 'Child received SIGINT' if $code == 102;
742 0 0       0 $code = 2, $sig = 6, $err = 'Child received SIGABRT' if $code == 106;
743 0 0       0 $code = 2, $sig = 11, $err = 'Child received SIGSEGV' if $code == 111;
744 0 0       0 $code = 2, $sig = 15, $err = 'Child received SIGTERM' if $code == 115;
745              
746 0         0 $child->{ERROR} = $err;
747             }
748              
749 47 100       295 if ( my $on_finish = $_MNGD->{ $child->{PKG} }{on_finish} ) {
750             $on_finish->(
751             $child->{WRK_ID}, $code, $child->{ident}, $sig, $err,
752 8         61 @{ $child->{RESULT} }
  8         192  
753             );
754             }
755              
756 47         295 return;
757             }
758              
759             sub _trap {
760 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
761              
762 0         0 alarm 0; my ( $exit_status, $name ) = ( 2, @_ );
  0         0  
763 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
764              
765       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
766 0 0       0 if ( exists $SIG{$name} );
767              
768 0 0       0 if ( $name eq 'HUP' ) { $exit_status = 101 }
  0 0       0  
    0          
    0          
    0          
769 0         0 elsif ( $name eq 'INT' ) { $exit_status = 102 }
770 0         0 elsif ( $name eq 'ABRT' ) { $exit_status = 106 }
771 0         0 elsif ( $name eq 'SEGV' ) { $exit_status = 111 }
772 0         0 elsif ( $name eq 'TERM' ) { $exit_status = 115 }
773              
774 0 0       0 if ( ! $_SELF->{IGNORE} ) {
775 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
776 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
777             }
778              
779 0         0 _exit($exit_status);
780             }
781              
782             sub _wait_one {
783 4     4   56 my ( $pkg ) = @_;
784 4         20 my ( $list, $self, $wrk_id ) = ( $_LIST->{$pkg} ); local $!;
  4         160  
785              
786 4         40 while () {
787 196         2524 $_DATA->{$pkg}->reap_data;
788 196         748 for my $child ( $list->vals() ) {
789 196         468 $wrk_id = $child->{WRK_ID};
790 196 50       568 return $list->del($wrk_id) if $child->{REAPED};
791 196 100       2080 $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG);
792             }
793 196 100       696 last if $self;
794 192         5785808 sleep 0.030;
795             }
796              
797 4         80 _reap_child($self, 0);
798              
799 4         28 $self;
800             }
801              
802             ###############################################################################
803             ## ----------------------------------------------------------------------------
804             ## Delay implementation suited for MCE::Child.
805             ##
806             ###############################################################################
807              
808             package # hide from rpm
809             MCE::Child::_delay;
810              
811             sub new {
812 12     12   47 my ( $class, $chnl, $delay ) = @_;
813              
814 12 50       47 if ( !defined $delay ) {
815 12 50       95 $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008;
816             }
817              
818 12         58 $chnl->send(undef);
819              
820 12         70 bless [ $delay, $chnl ], $class;
821             }
822              
823             sub seconds {
824 0     0   0 my ( $self, $how_long ) = @_;
825 0 0       0 my $delay = defined($how_long) ? $how_long : $self->[0];
826 0         0 my $lapse = $self->[1]->recv();
827 0         0 my $time = MCE::Util::_time();
828              
829 0 0 0     0 if ( !$delay || !defined $lapse ) {
    0          
830 0         0 $lapse = $time;
831             }
832             elsif ( $lapse + $delay - $time < 0 ) {
833 0         0 $lapse += int( abs($time - $lapse) / $delay + 0.5 ) * $delay;
834             }
835              
836 0         0 $self->[1]->send( $lapse += $delay );
837              
838 0         0 return $lapse - $time;
839             }
840              
841             ###############################################################################
842             ## ----------------------------------------------------------------------------
843             ## Hash and ordhash implementations suited for MCE::Child.
844             ##
845             ###############################################################################
846              
847             package # hide from rpm
848             MCE::Child::_hash;
849              
850 12     12   19965 use Time::HiRes 'sleep';
  12         12  
  12         71  
851              
852             use constant {
853 12 50       16301 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
854             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
855 12     12   1781 };
  12         24  
856              
857             sub new {
858 12     12   69 my ( $class, $chnl ) = @_;
859              
860 12         58 bless [ {}, $chnl ], shift;
861             }
862              
863             sub clear {
864 7     7   22 my ( $self ) = @_;
865              
866 7         40 1 while ( $self->[1]->recv2_nb() );
867              
868 7         18 %{ $self->[0] } = ();
  7         23  
869             }
870              
871             sub exists {
872 15     15   35 my ( $self, $key ) = @_;
873              
874 15         165 while ( my $data = $self->[1]->recv2_nb() ) {
875 10         115 $self->[0]{ $data->[0] } = $data->[1];
876             }
877              
878 15         135 CORE::exists $self->[0]{ $key };
879             }
880              
881             sub get {
882 47     47   152 my ( $self, $wrk_id, $wait_flag ) = @_;
883              
884 47 100       226 if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) {
885 28         178 while ( my $data = $self->[1]->recv2_nb() ) {
886 33         202 $self->[0]{ $data->[0] } = $data->[1];
887             }
888             }
889              
890 47 100       148 if ( $wait_flag ) {
891 41         353 local $!;
892 41 100       7135270 ( CORE::exists $self->[0]{ 'R'.$wrk_id } ) ? waitpid($wrk_id, 0) : do {
893 9         18 while () {
894 20         139 my $data = $self->[1]->recv2_nb();
895 20 100       338 if ( !defined $data ) {
896 10 50       104 last if waitpid($wrk_id, _WNOHANG);
897 10         10630 sleep(0.0009), next;
898             }
899 10         47 $self->[0]{ $data->[0] } = $data->[1];
900 10 100       12799960 waitpid($wrk_id, 0), last if $data->[0] eq 'R'.$wrk_id;
901             }
902 9 50       206 if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) {
903 0         0 while ( my $data = $self->[1]->recv2_nb() ) {
904 0         0 $self->[0]{ $data->[0] } = $data->[1];
905             }
906             }
907             };
908             }
909              
910 47         424 my $result = delete $self->[0]{ 'R'.$wrk_id };
911 47         361 my $error = delete $self->[0]{ 'S'.$wrk_id };
912              
913 47 50       335 $result = '' unless ( defined $result );
914 47 50       131 $error = '' unless ( defined $error );
915              
916 47         302 return ( $result, $error );
917             }
918              
919             sub reap_data {
920 325     325   975 my ( $self ) = @_;
921              
922 325 100       1005 if (wantarray) {
923 65         147 my @ret;
924 65         701 while ( my $data = $self->[1]->recv2_nb() ) {
925 19 100       304 push @ret, substr($data->[0], 1) if substr($data->[0], 0, 1) eq 'R';
926 19         283 $self->[0]{ $data->[0] } = $data->[1];
927             }
928 65         1095 return @ret;
929             }
930              
931 260         1512 while ( my $data = $self->[1]->recv2_nb() ) {
932 24         140 $self->[0]{ $data->[0] } = $data->[1];
933             }
934              
935 260         568 return;
936             }
937              
938             sub set {
939 20     20   740 $_[0]->[1]->send2([ $_[1], $_[2] ]);
940             }
941              
942             package # hide from rpm
943             MCE::Child::_ordhash;
944              
945 12     12   59 sub new { bless [ {}, [], {}, 0 ], shift; } # data, keys, indx, gcnt
946 0     0   0 sub exists { CORE::exists $_[0]->[0]{ $_[1] }; }
947 0     0   0 sub get { $_[0]->[0]{ $_[1] }; }
948 22     22   142 sub len { scalar keys %{ $_[0]->[0] }; }
  22         299  
949              
950             sub clear {
951 0     0   0 my ( $self ) = @_;
952 0         0 %{ $self->[0] } = @{ $self->[1] } = %{ $self->[2] } = (), $self->[3] = 0;
  0         0  
  0         0  
  0         0  
953              
954 0         0 return;
955             }
956              
957             sub del {
958 47     47   209 my ( $self, $key ) = @_;
959 47 50       306 return undef unless defined( my $off = delete $self->[2]{$key} );
960              
961             # tombstone
962 47         149 $self->[1][$off] = undef;
963              
964             # GC keys and refresh index
965 47 100       152 if ( ++$self->[3] > @{ $self->[1] } * 0.667 ) {
  47         295  
966 18         78 my ( $keys, $indx ) = ( $self->[1], $self->[2] );
967 18         44 my $i; $i = $self->[3] = 0;
  18         57  
968 18         43 for my $k ( @{ $keys } ) {
  18         192  
969 46 50       197 $keys->[$i] = $k, $indx->{$k} = $i++ if defined($k);
970             }
971 18         71 splice @{ $keys }, $i;
  18         77  
972             }
973              
974 47         1048 delete $self->[0]{$key};
975             }
976              
977             sub set {
978 55     55   1219 my ( $self, $key ) = @_;
979 55 50       925 $self->[0]{$key} = $_[2], return 1 if exists($self->[0]{$key});
980              
981 55         205 $self->[2]{$key} = @{ $self->[1] }; push @{ $self->[1] }, $key;
  55         1645  
  55         223  
  55         1180  
982 55         683 $self->[0]{$key} = $_[2];
983              
984 55         274 return 1;
985             }
986              
987             sub vals {
988 229     229   585 my ( $self ) = @_;
989             $self->[3]
990 1         7 ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } }
  1         15  
991 229 100       635 : @{ $self->[0] }{ @{ $self->[1] } };
  228         1136  
  228         500  
992             }
993              
994             1;
995              
996             __END__