File Coverage

blib/lib/MCE/Child.pm
Criterion Covered Total %
statement 372 521 71.4
branch 176 406 43.3
condition 51 161 31.6
subroutine 55 74 74.3
pod 23 23 100.0
total 677 1185 57.1


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## A threads-like parallelization module compatible with Perl 5.8.
4             ##
5             ###############################################################################
6              
7 12     12   922673 use strict;
  12         123  
  12         361  
8 12     12   60 use warnings;
  12         12  
  12         360  
9              
10 12     12   72 no warnings qw( threads recursion uninitialized once redefine );
  12         25  
  12         901  
11              
12             package MCE::Child;
13              
14             our $VERSION = '1.888';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
18             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
19             ## no critic (TestingAndDebugging::ProhibitNoStrict)
20              
21 12     12   5727 use MCE::Signal ();
  12         35  
  12         471  
22 12     12   5370 use MCE::Mutex ();
  12         24  
  12         235  
23 12     12   6283 use MCE::Channel ();
  12         35  
  12         342  
24 12     12   84 use Time::HiRes 'sleep';
  12         24  
  12         60  
25              
26             use overload (
27             q(==) => \&equal,
28 0     0   0 q(!=) => sub { !equal(@_) },
29 12         150 fallback => 1
30 12     12   17078 );
  12         11682  
31              
32             sub import {
33 12 50   12   143 if (caller !~ /^MCE::/) {
34 12     12   1402 no strict 'refs'; no warnings 'redefine';
  12     12   101  
  12         546  
  12         70  
  12         35  
  12         1296  
35 12         23 *{ caller().'::mce_child' } = \&mce_child;
  12         72  
36             }
37 12         143 return;
38             }
39              
40             ## The POSIX module has many symbols. Try not loading it simply
41             ## to have WNOHANG. The following covers most platforms.
42              
43             use constant {
44 12 50       60351 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
45             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
46 12     12   84 };
  12         25  
47              
48             my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} );
49              
50             my $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
51             my $_tid = ( $INC{'threads.pm'} ) ? threads->tid() : 0;
52              
53             sub CLONE {
54 0 0   0   0 $_tid = threads->tid(), &_clear() if $INC{'threads.pm'};
55             }
56              
57             sub _clear {
58 0     0   0 %{ $_LIST } = ();
  0         0  
59             }
60              
61             sub _max_workers {
62 10     10   20 my ( $cpus ) = @_;
63 10 100       69 if ( $cpus eq 'auto' ) {
    100          
64 1         4 $cpus = MCE::Util::get_ncpu();
65             }
66             elsif ( $cpus =~ /^([0-9.]+)%$/ ) {
67 6         34 my ( $percent, $ncpu ) = ( $1 / 100, MCE::Util::get_ncpu() );
68 6         28 $cpus = $ncpu * $percent + 0.5;
69             }
70 10 100 66     107 $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
71 10         27 return int($cpus);
72             }
73              
74             ###############################################################################
75             ## ----------------------------------------------------------------------------
76             ## Init routine.
77             ##
78             ###############################################################################
79              
80             bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__;
81              
82             sub init {
83 19 100 66 19 1 4385 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
84              
85             # -- options ----------------------------------------------------------
86             # max_workers child_timeout posix_exit on_start on_finish void_context
87             # ---------------------------------------------------------------------
88              
89 19 100       219 my $pkg = "$$.$_tid.".( caller eq __PACKAGE__ ? caller(1) : caller );
90 19 50       152 my $mngd = $_MNGD->{$pkg} = ( ref $_[0] eq 'HASH' ) ? shift : { @_ };
91              
92 19         55 @_ = ();
93              
94             $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg,
95 19         132 $mngd->{WRK_ID} = $$;
96              
97 19 100       126 &_force_reap($pkg), $_DATA->{$pkg}->clear() if ( exists $_LIST->{$pkg} );
98              
99 19 100       84 if ( !exists $_LIST->{$pkg} ) {
100 12 0 33     36 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
101 12 50       25 sleep 0.015 if $_tid;
102              
103             # Start the shared-manager process if not running.
104 12 50       36 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
105              
106 12         61 my $chnl = MCE::Channel->new( impl => 'Mutex' );
107 12         117 $_LIST->{ $pkg } = MCE::Child::_ordhash->new();
108 12         94 $_DELY->{ $pkg } = MCE::Child::_delay->new( $chnl );
109 12         107 $_DATA->{ $pkg } = MCE::Child::_hash->new( $chnl );
110 12         570 $_DATA->{"$pkg:seed"} = int(rand() * 1e9);
111 12         159 $_DATA->{"$pkg:id" } = 0;
112              
113 12 0 33     71 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
114             }
115              
116 19 50       96 if ( !exists $mngd->{posix_exit} ) {
117             $mngd->{posix_exit} = 1 if (
118             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
119             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
120             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
121             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
122 19 50 33     1177 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
123             );
124             }
125              
126 19 100       85 if ( defined $mngd->{max_workers} ) {
127 3         8 $mngd->{max_workers} = _max_workers($mngd->{max_workers});
128             }
129              
130 19 50 33     73 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
131 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
132             }
133              
134             require POSIX
135 19 50 66     5243 if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 );
      66        
136              
137 19         38891 return;
138             }
139              
140             ###############################################################################
141             ## ----------------------------------------------------------------------------
142             ## 'new', 'mce_child', and 'create' for threads-like similarity.
143             ##
144             ###############################################################################
145              
146             ## 'new' and 'tid' are aliases for 'create' and 'pid' respectively.
147              
148             *new = \&create, *tid = \&pid;
149              
150             ## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2)
151             ## Tip found in threads::async.
152              
153             sub mce_child (&;@) {
154 0     0 1 0 goto &create;
155             }
156              
157             sub create {
158 65   66 65 1 22662 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
159             # construct mngd internally on first use unless defined
160             init(); $_MNGD->{ "$$.$_tid.".caller() };
161             };
162              
163 65 50       389 shift if ( $_[0] eq __PACKAGE__ );
164              
165             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
166              
167 65 50       827 my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__;
  0         0  
168              
169 65 50       606 $self->{IGNORE} = 1 if $SIG{CHLD} eq 'IGNORE';
170 65         1153 $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG};
171 65 50 33     361 $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' );
172              
173 65 0 33     140 my $func = shift; $func = caller().'::'.$func
  65   33     289  
174             if ( !ref $func && length $func && index($func,':') < 0 );
175              
176 65 50       245 if ( !defined $func ) {
177 0         0 local $\; print {*STDERR} "code function is not specified or valid\n";
  0         0  
  0         0  
178 0         0 return undef;
179             }
180              
181             my ( $list, $max_workers, $pkg ) = (
182             $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG}
183 65         257 );
184              
185 65 50       319 $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) >= 2e9 );
186              
187             {
188             # Reap completed child processes.
189 65         114 local ($SIG{CHLD}, $!, $?, $_); map {
  65         2957  
190 0         0 waitpid($_, 0); _reap_child($list->del($_), 0); ();
  0         0  
  0         0  
191             }
192 65         775 $_DATA->{$pkg}->reap_data;
193             }
194              
195 65 50 33     1140 if ( $max_workers || $self->{IGNORE} ) {
196             # Wait for a slot if saturated.
197 0 0 0     0 if ( $max_workers && $list->len() >= $max_workers ) {
198 0         0 my $count = $list->len() - $max_workers + 1;
199 0         0 _wait_one($pkg) for 1 .. $count;
200             }
201             }
202              
203             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
204              
205 65 0 33     332 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
206              
207 65         339 my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages
  65         145  
208 65         123 my ( $killed, $pid );
209              
210             {
211 65     0   96 local $SIG{TERM} = local $SIG{INT} = sub { $killed = $_[0] }
  0         0  
212 65 50 33     2842 if ( !$_is_MSWin32 && $] ge '5.010001' );
213              
214             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
215 65 50       2539 if ( !$_is_MSWin32 );
216              
217 65         72549 $pid = fork();
218              
219 65 50       3577 if ( !defined $pid ) { # error
    100          
220 0         0 local $\; print {*STDERR} "fork error: $!\n";
  0         0  
  0         0  
221             }
222             elsif ( $pid ) { # parent
223 55         1604 $self->{WRK_ID} = $pid;
224 55         2509 $list->set($pid, $self);
225 55 100       8210 $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start};
226             }
227             else { # child
228 10         890 %{ $_LIST } = (), $_SELF = $self;
  10         2105  
229              
230             local $SIG{TERM} = local $SIG{INT} = local $SIG{ABRT} = \&_trap,
231             local $SIG{SEGV} = local $SIG{HUP} = \&_trap,
232 10         2444 local $SIG{QUIT} = \&_quit;
233 10         421 local $SIG{CHLD};
234              
235 10 50       481 MCE::Shared::init() if $INC{'MCE/Shared.pm'};
236 10 50       968 $_DATA->{ $_SELF->{PKG} }->set('S'.$$, '') unless $self->{IGNORE};
237 10 50       110 CORE::kill($killed, $$) if $killed;
238              
239             # Sets the seed of the base generator uniquely between workers.
240             # The new seed is computed using the current seed and ID value.
241             # One may set the seed at the application level for predictable
242             # results. Ditto for Math::Prime::Util, Math::Random, and
243             # Math::Random::MT::Auto.
244              
245 10         305 srand( abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560 );
246              
247 10 50       108 if ( $INC{'Math/Prime/Util.pm'} ) {
248             Math::Prime::Util::srand(
249 0         0 abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560
250             );
251             }
252              
253 10 50       155 if ( $INC{'Math/Random.pm'} ) {
254 0         0 my $cur_seed = Math::Random::random_get_seed();
255 0 0       0 my $new_seed = ($cur_seed < 1073741781)
256             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
257             : $cur_seed - ((abs($id) * 10000) % 1073741780);
258              
259 0         0 Math::Random::random_set_seed($new_seed, $new_seed);
260             }
261              
262 10 50       149 if ( $INC{'Math/Random/MT/Auto.pm'} ) {
263 0         0 my $cur_seed = Math::Random::MT::Auto::get_seed()->[0];
264 0 0       0 my $new_seed = ($cur_seed < 1073741781)
265             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
266             : $cur_seed - ((abs($id) * 10000) % 1073741780);
267              
268 0         0 Math::Random::MT::Auto::set_seed($new_seed);
269             }
270              
271 10         653 _dispatch($mngd, $func, \@args);
272             }
273             }
274              
275 55 0 33     2704 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
276              
277 55 50       387 CORE::kill($killed, $$) if $killed;
278              
279 55 50       5738 return $pid ? $self : undef;
280             }
281              
282             ###############################################################################
283             ## ----------------------------------------------------------------------------
284             ## Public methods.
285             ##
286             ###############################################################################
287              
288             sub equal {
289 0 0 0 0 1 0 return 0 unless ( ref $_[0] && ref $_[1] );
290 0 0       0 $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0;
291             }
292              
293             sub error {
294 31 50   31 1 287 _croak('Usage: $child->error()') unless ref( my $self = $_[0] );
295 31 50       193 $self->join() unless $self->{REAPED};
296 31 50       383 $self->{ERROR} || undef;
297             }
298              
299             sub exit {
300 10 50 33 10 1 635 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
301              
302 10 50       190 my ( $self ) = ( ref $_[0] ? shift : $_SELF );
303 10         115 my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} );
304              
305 10 50 33     215 if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) {
    50          
306 0         0 MCE::Child->finish('MCE'); CORE::exit(@_);
  0         0  
307             }
308             elsif ( $wrk_id == $$ ) {
309 0   0     0 alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0;
  0         0  
  0         0  
310 0 0       0 $_DATA->{$pkg}->set('R'.$wrk_id, @res ? \@res : '');
311 0         0 die "Child exited ($?)\n";
312 0         0 _exit($?); # not reached
313             }
314              
315 10 50       70 return $self if $self->{REAPED};
316              
317 10 50       65 if ( exists $_DATA->{$pkg} ) {
318 10         170 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
319             } else {
320 0         0 sleep 0.030;
321             }
322              
323 10 50       75 if ($_is_MSWin32) {
324 0 0       0 CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id);
325             } else {
326 10 50       475 CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
327             }
328              
329 10         110 $self;
330             }
331              
332             sub finish {
333 26 50   26 1 3749 _croak('Usage: MCE::Child->finish()') if ref($_[0]);
334 26 50 33     360 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
335              
336 26 100       307 my $pkg = defined($_[0]) ? $_[0] : caller();
337              
338 26 100       140 if ( $pkg eq 'MCE' ) {
    100          
339 23         69 for my $key ( keys %{ $_LIST } ) { MCE::Child->finish($key); }
  23         190  
  2         16  
340             }
341             elsif ( exists $_LIST->{$pkg} ) {
342 2 50       27 return if $MCE::Signal::KILLED;
343              
344 2 50       20 if ( exists $_DELY->{$pkg} ) {
345 2         20 &_force_reap($pkg);
346             delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}),
347             delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}),
348 2         223 delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg });
349             }
350             }
351              
352 26         93 @_ = ();
353              
354 26         1884 return;
355             }
356              
357             sub is_joinable {
358 24 50   24 1 104 _croak('Usage: $child->is_joinable()') unless ref( my $self = $_[0] );
359 24         152 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
360              
361 24 50       160 if ( $wrk_id == $$ ) {
    50          
362 0         0 '';
363             }
364             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
365 24 50       80 return '' if $self->{REAPED};
366 24         72 local $!; $_DATA->{$pkg}->reap_data;
  24         96  
367 24 50       432 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do {
368 0 0       0 _reap_child($self, 0) unless $self->{REAPED};
369 0         0 1;
370             };
371             }
372             else {
373             # limitation for MCE::Child only; allowed for MCE::Hobo
374 0         0 _croak('Error: $child->is_joinable() not called by managed process');
375             }
376             }
377              
378             sub is_running {
379 24 50   24 1 11264 _croak('Usage: $child->is_running()') unless ref( my $self = $_[0] );
380 24         112 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
381              
382 24 50       200 if ( $wrk_id == $$ ) {
    50          
383 0         0 1;
384             }
385             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
386 24 50       96 return '' if $self->{REAPED};
387 24         72 local $!; $_DATA->{$pkg}->reap_data;
  24         112  
388 24 50       472 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do {
389 0 0       0 _reap_child($self, 0) unless $self->{REAPED};
390 0         0 '';
391             };
392             }
393             else {
394             # limitation for MCE::Child only; allowed for MCE::Hobo
395 0         0 _croak('Error: $child->is_running() not called by managed process');
396             }
397             }
398              
399             sub join {
400 42 50   42 1 22839 _croak('Usage: $child->join()') unless ref( my $self = $_[0] );
401 42         163 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
402              
403 42 50       133 if ( $self->{REAPED} ) {
404 0 0       0 _croak('Child already joined') unless exists( $self->{RESULT} );
405 0 0       0 $_LIST->{$pkg}->del($wrk_id) if ( exists $_LIST->{$pkg} );
406              
407             return ( defined wantarray )
408 0 0       0 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 0       0  
409             : ();
410             }
411              
412 42 50       261 if ( $wrk_id == $$ ) {
    50          
413 0         0 _croak('Cannot join self');
414             }
415             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
416             # remove from list after reaping
417 42         675 local $SIG{CHLD};
418 42         378 _reap_child($self, 1);
419 42         186 $_LIST->{$pkg}->del($wrk_id);
420             }
421             else {
422             # limitation for MCE::Child only; allowed for MCE::Hobo
423 0         0 _croak('Error: $child->join() not called by managed process');
424             }
425              
426 42 50       256 return unless ( exists $self->{RESULT} );
427              
428             ( defined wantarray )
429 42 50       457 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 100       0  
430             : ();
431             }
432              
433             sub kill {
434 5 50   5 1 150 _croak('Usage: $child->kill()') unless ref( my $self = $_[0] );
435 5         75 my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] );
436              
437 5 50       30 if ( $wrk_id == $$ ) {
438 0   0     0 CORE::kill($signal || 'INT', $$);
439 0         0 return $self;
440             }
441 5 50       55 if ( $self->{MGR_ID} eq "$$.$_tid" ) {
442 5 50       20 return $self if $self->{REAPED};
443 5 50       60 if ( exists $_DATA->{$pkg} ) {
444 5         35 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
445             } else {
446 0         0 sleep 0.030;
447             }
448             }
449              
450 5 50 50     195 CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
451              
452 5         25 $self;
453             }
454              
455             sub list {
456 8 50   8 1 5968 _croak('Usage: MCE::Child->list()') if ref($_[0]);
457 8         56 my $pkg = "$$.$_tid.".caller();
458              
459 8 50       48 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : ();
460             }
461              
462             sub list_pids {
463 8 50   8 1 480 _croak('Usage: MCE::Child->list_pids()') if ref($_[0]);
464 8         304 my $pkg = "$$.$_tid.".caller(); local $_;
  8         184  
465              
466 8 50       336 ( exists $_LIST->{$pkg} ) ? map { $_->pid } $_LIST->{$pkg}->vals() : ();
  24         264  
467             }
468              
469             sub list_joinable {
470 8 50   8 1 5776 _croak('Usage: MCE::Child->list_joinable()') if ref($_[0]);
471 8         120 my $pkg = "$$.$_tid.".caller();
472              
473 8 50       304 return () unless ( my $list = $_LIST->{$pkg} );
474 8         88 local ($!, $?, $_);
475              
476 8         600 $_DATA->{$pkg}->reap_data;
477              
478             map {
479 8 50       56 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do {
  24         384  
480 0 0       0 _reap_child($_, 0) unless $_->{REAPED};
481 0         0 $_;
482             };
483             }
484             $list->vals();
485             }
486              
487             sub list_running {
488 8 50   8 1 18296 _croak('Usage: MCE::Child->list_running()') if ref($_[0]);
489 8         112 my $pkg = "$$.$_tid.".caller();
490              
491 8 50       40 return () unless ( my $list = $_LIST->{$pkg} );
492 8         112 local ($!, $?, $_);
493              
494 8         120 $_DATA->{$pkg}->reap_data;
495              
496             map {
497 8 50       32 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do {
  24         296  
498 0 0       0 _reap_child($_, 0) unless $_->{REAPED};
499 0         0 ();
500             };
501             }
502             $list->vals();
503             }
504              
505             sub max_workers {
506 17 50   17 1 61 _croak('Usage: MCE::Child->max_workers()') if ref($_[0]);
507 17   33     75 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
508             # construct mngd internally on first use unless defined
509             init(); $_MNGD->{ "$$.$_tid.".caller() };
510             };
511 17 50       44 shift if ( $_[0] eq __PACKAGE__ );
512              
513 17 100       40 $mngd->{max_workers} = _max_workers(shift) if @_;
514 17         64 $mngd->{max_workers};
515             }
516              
517             sub pending {
518 8 50   8 1 5560 _croak('Usage: MCE::Child->pending()') if ref($_[0]);
519 8         48 my $pkg = "$$.$_tid.".caller();
520              
521 8 50       64 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0;
522             }
523              
524             sub pid {
525 31 50   31 1 423 ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID};
526             }
527              
528             sub result {
529 7 50   7 1 176 _croak('Usage: $child->result()') unless ref( my $self = $_[0] );
530 7 50       34 return $self->join() unless $self->{REAPED};
531              
532 7 50       35 _croak('Child already joined') unless exists( $self->{RESULT} );
533 7 50       70 wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1];
  0         0  
534             }
535              
536             sub self {
537 0 0   0 1 0 ref($_[0]) ? $_[0] : $_SELF;
538             }
539              
540             sub wait_all {
541 1 50   1 1 68 _croak('Usage: MCE::Child->wait_all()') if ref($_[0]);
542 1         50 my $pkg = "$$.$_tid.".caller();
543              
544             return wantarray ? () : 0
545 1 0 33     78 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
    50          
546              
547 1         16 local $_; ( wantarray )
548 0         0 ? map { $_->join(); $_ } $_LIST->{$pkg}->vals()
  0         0  
549 1 50       120 : map { $_->join(); () } $_LIST->{$pkg}->vals();
  3         19  
  3         12  
550             }
551              
552             *waitall = \&wait_all; # compatibility
553              
554             sub wait_one {
555 4 50   4 1 280 _croak('Usage: MCE::Child->wait_one()') if ref($_[0]);
556 4         180 my $pkg = "$$.$_tid.".caller();
557              
558             return undef
559 4 50 33     216 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
560              
561 4         164 _wait_one($pkg);
562             }
563              
564             *waitone = \&wait_one; # compatibility
565              
566             sub yield {
567 0 0   0 1 0 _croak('Usage: MCE::Child->yield()') if ref($_[0]);
568 0 0 0     0 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
569              
570 0   0     0 my $pkg = $_SELF->{PKG} || do {
571             my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
572             # construct mngd internally on first use unless defined
573             init(); $_MNGD->{ "$$.$_tid.".caller() };
574             };
575             $mngd->{PKG};
576             };
577              
578 0 0       0 return unless $_DELY->{$pkg};
579 0         0 my $seconds = $_DELY->{$pkg}->seconds(@_);
580              
581 0         0 MCE::Util::_sleep( $seconds );
582             }
583              
584             ###############################################################################
585             ## ----------------------------------------------------------------------------
586             ## Private methods.
587             ##
588             ###############################################################################
589              
590             sub _croak {
591 0 0   0   0 if ( $INC{'MCE.pm'} ) {
592 0         0 goto &MCE::_croak;
593             }
594             else {
595 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
596 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
597              
598 0         0 $\ = undef; goto &Carp::croak;
  0         0  
599             }
600             }
601              
602             sub _dispatch {
603 10     10   176 my ( $mngd, $func, $args ) = @_;
604              
605 10         372 $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$, $? = 0;
606 10 50       84 $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32;
607              
608             {
609 10         68 local $!;
  10         303  
610 10 50       1082 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
611 10 50       3389 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
612             }
613              
614             # Run task.
615             my $child_timeout = ( exists $_SELF->{child_timeout} )
616 9 50       562 ? $_SELF->{child_timeout} : $mngd->{child_timeout};
617              
618             my $void_context = ( exists $_SELF->{void_context} )
619 9 50       127 ? $_SELF->{void_context} : $mngd->{void_context};
620              
621 9         39 my @res; my $timed_out = 0;
  9         22  
622              
623             local $SIG{'ALRM'} = sub {
624 0     0   0 alarm 0; $timed_out = 1; $SIG{__WARN__} = sub {};
  0         0  
  0         0  
625 0         0 die "Child timed out\n";
626 9         729 };
627              
628 9 50 33     552 if ( $void_context || $_SELF->{IGNORE} ) {
629 12     12   108 no strict 'refs';
  12         24  
  12         612  
630 0   0     0 eval { alarm($child_timeout || 0); $func->(@{ $args }) };
  0         0  
  0         0  
  0         0  
631             }
632             else {
633 12     12   227 no strict 'refs';
  12         465  
  12         2326  
634 9   50     145 @res = eval { alarm($child_timeout || 0); $func->(@{ $args }) };
  9         988  
  9         46  
  9         205  
635             }
636              
637 7         129 alarm 0;
638 7 50       80 $@ = "Child timed out" if $timed_out;
639              
640 7 50       86 if ( $@ ) {
641 0 0       0 _exit($?) if ( $@ =~ /^Child exited \(\S+\)$/ );
642 0         0 my $err = $@; $? = 1; $err =~ s/, <__ANONIO__> line \d+//;
  0         0  
  0         0  
643              
644 0 0       0 if ( ! $_SELF->{IGNORE} ) {
645             $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err),
646 0         0 $_DATA->{ $_SELF->{PKG} }->set('R'.$$, '');
647             }
648              
649 0 0 0     0 if ( !$timed_out && !$mngd->{on_finish} && !$INC{'MCE/Simple.pm'} ) {
      0        
650 12     12   7113 use bytes; warn "Child $$ terminated abnormally: reason $err\n";
  12         202  
  12         60  
  0         0  
651             }
652             }
653             else {
654 7 50       123 shift(@res) if ref($res[0]) =~ /^MCE::(?:Barrier|Semaphore)::_guard/s;
655             $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? \@res : '')
656 7 50       232 if ( ! $_SELF->{IGNORE} );
    50          
657             }
658              
659 7         135 _exit($?);
660             }
661              
662             sub _exit {
663 10     10   86 my ( $exit_status ) = @_;
664              
665             # Check for nested workers not yet joined.
666 10 50 66     214 MCE::Child->finish('MCE') if ( !$_SELF->{SIGNALED} && keys %{ $_LIST } );
  7         70  
667              
668             # Exit child process.
669 10 50   0   347 $SIG{__DIE__} = sub {} unless $_tid;
670 10     0   275 $SIG{__WARN__} = sub {};
671              
672 10 50 33     130 threads->exit($exit_status) if ( $INC{'threads.pm'} && $_is_MSWin32 );
673              
674             my $posix_exit = ( exists $_SELF->{posix_exit} )
675 10 50       171 ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit};
676              
677 10 0 33     106 if ( $posix_exit && !$_SELF->{SIGNALED} && !$_is_MSWin32 ) {
      33        
678 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
679 0 0       0 POSIX::_exit($exit_status) if $INC{'POSIX.pm'};
680 0         0 CORE::kill('KILL', $$);
681             }
682              
683 10         4979 CORE::exit($exit_status);
684             }
685              
686             sub _force_reap {
687 9     9   63 my ( $count, $pkg ) = ( 0, @_ );
688 9 50 33     103 return unless ( exists $_LIST->{$pkg} && $_LIST->{$pkg}->len() );
689              
690 0         0 for my $child ( $_LIST->{$pkg}->vals() ) {
691 0 0       0 next if $child->{IGNORE};
692              
693 0 0       0 if ( $child->is_running() ) {
694 0 0       0 sleep(0.015), CORE::kill('KILL', $child->pid())
695             if CORE::kill('ZERO', $child->pid());
696 0         0 $count++;
697             }
698             }
699              
700 0         0 $_LIST->{$pkg}->clear();
701              
702 0 0 0     0 warn "Finished with active child processes [$pkg] ($count)\n"
703             if ( $count && !$_is_MSWin32 );
704              
705 0         0 return;
706             }
707              
708             sub _quit {
709 3 50   3   4276 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
710              
711 3         24 alarm 0; my ( $name ) = @_;
  3         22  
712 3         81 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
713              
714       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
715 3 50       156 if ( exists $SIG{$name} );
716              
717 3 50       18 if ( ! $_SELF->{IGNORE} ) {
718 3         27 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
719 3         25 $_DATA->{$pkg}->set('R'.$wrk_id, '');
720             }
721              
722 3         23 _exit(0);
723             }
724              
725             sub _reap_child {
726 46     46   135 my ( $child, $wait_flag ) = @_;
727 46 50       215 return unless $child;
728              
729 46         285 local @_ = $_DATA->{ $child->{PKG} }->get( $child->{WRK_ID}, $wait_flag );
730              
731 46 100 50     965 ( $child->{ERROR}, $child->{RESULT}, $child->{REAPED} ) =
732             ( pop || '', length $_[0] ? pop : [], 1 );
733              
734 46 50       175 return if $child->{IGNORE};
735              
736 46   50     636 my ( $exit, $err ) = ( $? || 0, $child->{ERROR} );
737 46         171 my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f );
738              
739 46 50 33     154 if ( $code > 100 && !$err ) {
740 0 0       0 $code = 2, $sig = 1, $err = 'Child received SIGHUP' if $code == 101;
741 0 0       0 $code = 2, $sig = 2, $err = 'Child received SIGINT' if $code == 102;
742 0 0       0 $code = 2, $sig = 6, $err = 'Child received SIGABRT' if $code == 106;
743 0 0       0 $code = 2, $sig = 11, $err = 'Child received SIGSEGV' if $code == 111;
744 0 0       0 $code = 2, $sig = 15, $err = 'Child received SIGTERM' if $code == 115;
745              
746 0         0 $child->{ERROR} = $err;
747             }
748              
749 46 100       213 if ( my $on_finish = $_MNGD->{ $child->{PKG} }{on_finish} ) {
750             $on_finish->(
751             $child->{WRK_ID}, $code, $child->{ident}, $sig, $err,
752 7         69 @{ $child->{RESULT} }
  7         126  
753             );
754             }
755              
756 46         294 return;
757             }
758              
759             sub _trap {
760 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
761              
762 0         0 alarm 0; my ( $exit_status, $name ) = ( 2, @_ );
  0         0  
763 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
764              
765       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
766 0 0       0 if ( exists $SIG{$name} );
767              
768 0 0       0 if ( $name eq 'HUP' ) { $exit_status = 101 }
  0 0       0  
    0          
    0          
    0          
769 0         0 elsif ( $name eq 'INT' ) { $exit_status = 102 }
770 0         0 elsif ( $name eq 'ABRT' ) { $exit_status = 106 }
771 0         0 elsif ( $name eq 'SEGV' ) { $exit_status = 111 }
772 0         0 elsif ( $name eq 'TERM' ) { $exit_status = 115 }
773              
774 0 0       0 if ( ! $_SELF->{IGNORE} ) {
775 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
776 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
777             }
778              
779 0         0 _exit($exit_status);
780             }
781              
782             sub _wait_one {
783 4     4   64 my ( $pkg ) = @_;
784 4         72 my ( $list, $self, $wrk_id ) = ( $_LIST->{$pkg} ); local $!;
  4         152  
785              
786 4         16 while () {
787 192         2864 $_DATA->{$pkg}->reap_data;
788 192         924 for my $child ( $list->vals() ) {
789 192         508 $wrk_id = $child->{WRK_ID};
790 192 50       480 return $list->del($wrk_id) if $child->{REAPED};
791 192 100       2488 $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG);
792             }
793 192 100       624 last if $self;
794 188         5665832 sleep 0.030;
795             }
796              
797 4         60 _reap_child($self, 0);
798              
799 4         40 $self;
800             }
801              
802             ###############################################################################
803             ## ----------------------------------------------------------------------------
804             ## Delay implementation suited for MCE::Child.
805             ##
806             ###############################################################################
807              
808             package # hide from rpm
809             MCE::Child::_delay;
810              
811             sub new {
812 12     12   58 my ( $class, $chnl, $delay ) = @_;
813              
814 12 50       37 if ( !defined $delay ) {
815 12 50       109 $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008;
816             }
817              
818 12         37 $chnl->send(undef);
819              
820 12         62 bless [ $delay, $chnl ], $class;
821             }
822              
823             sub seconds {
824 0     0   0 my ( $self, $how_long ) = @_;
825 0 0       0 my $delay = defined($how_long) ? $how_long : $self->[0];
826 0         0 my $lapse = $self->[1]->recv();
827 0         0 my $time = MCE::Util::_time();
828              
829 0 0 0     0 if ( !$delay || !defined $lapse ) {
    0          
830 0         0 $lapse = $time;
831             }
832             elsif ( $lapse + $delay - $time < 0 ) {
833 0         0 $lapse += int( abs($time - $lapse) / $delay + 0.5 ) * $delay;
834             }
835              
836 0         0 $self->[1]->send( $lapse += $delay );
837              
838 0         0 return $lapse - $time;
839             }
840              
841             ###############################################################################
842             ## ----------------------------------------------------------------------------
843             ## Hash and ordhash implementations suited for MCE::Child.
844             ##
845             ###############################################################################
846              
847             package # hide from rpm
848             MCE::Child::_hash;
849              
850 12     12   20887 use Time::HiRes 'sleep';
  12         24  
  12         83  
851              
852             use constant {
853 12 50       15437 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
854             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
855 12     12   1744 };
  12         24  
856              
857             sub new {
858 12     12   59 my ( $class, $chnl ) = @_;
859              
860 12         48 bless [ {}, $chnl ], shift;
861             }
862              
863             sub clear {
864 7     7   25 my ( $self ) = @_;
865              
866 7         42 1 while ( $self->[1]->recv2_nb() );
867              
868 7         45 %{ $self->[0] } = ();
  7         24  
869             }
870              
871             sub exists {
872 15     15   60 my ( $self, $key ) = @_;
873              
874 15         155 while ( my $data = $self->[1]->recv2_nb() ) {
875 10         105 $self->[0]{ $data->[0] } = $data->[1];
876             }
877              
878 15         105 CORE::exists $self->[0]{ $key };
879             }
880              
881             sub get {
882 46     46   224 my ( $self, $wrk_id, $wait_flag ) = @_;
883              
884 46 100       250 if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) {
885 28         184 while ( my $data = $self->[1]->recv2_nb() ) {
886 36         244 $self->[0]{ $data->[0] } = $data->[1];
887             }
888             }
889              
890 46 100       365 if ( $wait_flag ) {
891 42         477 local $!;
892 42 100       8461853 ( CORE::exists $self->[0]{ 'R'.$wrk_id } ) ? waitpid($wrk_id, 0) : do {
893 8         16 while () {
894 16         112 my $data = $self->[1]->recv2_nb();
895 16 100       64 if ( !defined $data ) {
896 8 50       80 last if waitpid($wrk_id, _WNOHANG);
897 8         8168 sleep(0.0009), next;
898             }
899 8         32 $self->[0]{ $data->[0] } = $data->[1];
900 8 50       11485944 waitpid($wrk_id, 0), last if $data->[0] eq 'R'.$wrk_id;
901             }
902 8 50       256 if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) {
903 0         0 while ( my $data = $self->[1]->recv2_nb() ) {
904 0         0 $self->[0]{ $data->[0] } = $data->[1];
905             }
906             }
907             };
908             }
909              
910 46         374 my $result = delete $self->[0]{ 'R'.$wrk_id };
911 46         181 my $error = delete $self->[0]{ 'S'.$wrk_id };
912              
913 46 50       163 $result = '' unless ( defined $result );
914 46 50       155 $error = '' unless ( defined $error );
915              
916 46         592 return ( $result, $error );
917             }
918              
919             sub reap_data {
920 321     321   1110 my ( $self ) = @_;
921              
922 321 100       1176 if (wantarray) {
923 65         97 my @ret;
924 65         974 while ( my $data = $self->[1]->recv2_nb() ) {
925 17 50       229 push @ret, substr($data->[0], 1) if substr($data->[0], 0, 1) eq 'R';
926 17         276 $self->[0]{ $data->[0] } = $data->[1];
927             }
928 65         1147 return @ret;
929             }
930              
931 256         1976 while ( my $data = $self->[1]->recv2_nb() ) {
932 24         272 $self->[0]{ $data->[0] } = $data->[1];
933             }
934              
935 256         520 return;
936             }
937              
938             sub set {
939 20     20   867 $_[0]->[1]->send2([ $_[1], $_[2] ]);
940             }
941              
942             package # hide from rpm
943             MCE::Child::_ordhash;
944              
945 12     12   95 sub new { bless [ {}, [], {}, 0 ], shift; } # data, keys, indx, gcnt
946 0     0   0 sub exists { CORE::exists $_[0]->[0]{ $_[1] }; }
947 0     0   0 sub get { $_[0]->[0]{ $_[1] }; }
948 22     22   142 sub len { scalar keys %{ $_[0]->[0] }; }
  22         357  
949              
950             sub clear {
951 0     0   0 my ( $self ) = @_;
952 0         0 %{ $self->[0] } = @{ $self->[1] } = %{ $self->[2] } = (), $self->[3] = 0;
  0         0  
  0         0  
  0         0  
953              
954 0         0 return;
955             }
956              
957             sub del {
958 46     46   119 my ( $self, $key ) = @_;
959 46 50       245 return undef unless defined( my $off = delete $self->[2]{$key} );
960              
961             # tombstone
962 46         115 $self->[1][$off] = undef;
963              
964             # GC keys and refresh index
965 46 100       135 if ( ++$self->[3] > @{ $self->[1] } * 0.667 ) {
  46         317  
966 18         118 my ( $keys, $indx ) = ( $self->[1], $self->[2] );
967 18         78 my $i; $i = $self->[3] = 0;
  18         62  
968 18         44 for my $k ( @{ $keys } ) {
  18         145  
969 46 50       153 $keys->[$i] = $k, $indx->{$k} = $i++ if defined($k);
970             }
971 18         45 splice @{ $keys }, $i;
  18         94  
972             }
973              
974 46         1022 delete $self->[0]{$key};
975             }
976              
977             sub set {
978 55     55   1291 my ( $self, $key ) = @_;
979 55 50       1110 $self->[0]{$key} = $_[2], return 1 if exists($self->[0]{$key});
980              
981 55         102 $self->[2]{$key} = @{ $self->[1] }; push @{ $self->[1] }, $key;
  55         1891  
  55         283  
  55         1098  
982 55         604 $self->[0]{$key} = $_[2];
983              
984 55         164 return 1;
985             }
986              
987             sub vals {
988 225     225   586 my ( $self ) = @_;
989             $self->[3]
990 0         0 ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } }
  0         0  
991 225 50       584 : @{ $self->[0] }{ @{ $self->[1] } };
  225         1180  
  225         505  
992             }
993              
994             1;
995              
996             __END__