File Coverage

blib/lib/MCE/Child.pm
Criterion Covered Total %
statement 377 521 72.3
branch 177 406 43.6
condition 50 161 31.0
subroutine 55 74 74.3
pod 23 23 100.0
total 682 1185 57.5


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## A threads-like parallelization module compatible with Perl 5.8.
4             ##
5             ###############################################################################
6              
7 12     12   1188822 use strict;
  12         108  
  12         335  
8 12     12   48 use warnings;
  12         24  
  12         325  
9              
10 12     12   49 no warnings qw( threads recursion uninitialized once redefine );
  12         13  
  12         684  
11              
12             package MCE::Child;
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
18             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
19             ## no critic (TestingAndDebugging::ProhibitNoStrict)
20              
21 12     12   4407 use MCE::Signal ();
  12         24  
  12         371  
22 12     12   5130 use MCE::Mutex ();
  12         24  
  12         214  
23 12     12   4958 use MCE::Channel ();
  12         36  
  12         309  
24 12     12   63 use Time::HiRes 'sleep';
  12         13  
  12         73  
25              
26             use overload (
27             q(==) => \&equal,
28 0     0   0 q(!=) => sub { !equal(@_) },
29 12         107 fallback => 1
30 12     12   13129 );
  12         9939  
31              
32             sub import {
33 12 50   12   134 if (caller !~ /^MCE::/) {
34 12     12   1118 no strict 'refs'; no warnings 'redefine';
  12     12   24  
  12         409  
  12         70  
  12         13  
  12         1172  
35 12         23 *{ caller().'::mce_child' } = \&mce_child;
  12         61  
36             }
37 12         132 return;
38             }
39              
40             ## The POSIX module has many symbols. Try not loading it simply
41             ## to have WNOHANG. The following covers most platforms.
42              
43             use constant {
44 12 0       45941 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
45             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
46 12     12   72 };
  12         12  
47              
48             my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} );
49              
50             my $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
51             my $_tid = ( $INC{'threads.pm'} ) ? threads->tid() : 0;
52              
53             sub CLONE {
54 0 0   0   0 $_tid = threads->tid(), &_clear() if $INC{'threads.pm'};
55             }
56              
57             sub _clear {
58 0     0   0 %{ $_LIST } = ();
  0         0  
59             }
60              
61             sub _max_workers {
62 10     10   18 my ( $cpus ) = @_;
63 10 100       42 if ( $cpus eq 'auto' ) {
    100          
64 1         4 $cpus = MCE::Util::get_ncpu();
65             }
66             elsif ( $cpus =~ /^([0-9.]+)%$/ ) {
67 6         25 my ( $percent, $ncpu ) = ( $1 / 100, MCE::Util::get_ncpu() );
68 6         23 $cpus = $ncpu * $percent + 0.5;
69             }
70 10 100 66     77 $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
71 10         21 return int($cpus);
72             }
73              
74             ###############################################################################
75             ## ----------------------------------------------------------------------------
76             ## Init routine.
77             ##
78             ###############################################################################
79              
80             bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__;
81              
82             sub init {
83 19 100 66 19 1 3919 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
84              
85             # -- options ----------------------------------------------------------
86             # max_workers child_timeout posix_exit on_start on_finish void_context
87             # ---------------------------------------------------------------------
88              
89 19 100       166 my $pkg = "$$.$_tid.".( caller eq __PACKAGE__ ? caller(1) : caller );
90 19 50       148 my $mngd = $_MNGD->{$pkg} = ( ref $_[0] eq 'HASH' ) ? shift : { @_ };
91              
92 19         44 @_ = ();
93              
94             $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg,
95 19         96 $mngd->{WRK_ID} = $$;
96              
97 19 100       793 &_force_reap($pkg), $_DATA->{$pkg}->clear() if ( exists $_LIST->{$pkg} );
98              
99 19 100       69 if ( !exists $_LIST->{$pkg} ) {
100 12 0 33     36 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
101 12 50       36 sleep 0.015 if $_tid;
102              
103             # Start the shared-manager process if not running.
104 12 50       48 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
105              
106 12         73 my $chnl = MCE::Channel->new( impl => 'Mutex' );
107 12         84 $_LIST->{ $pkg } = MCE::Child::_ordhash->new();
108 12         72 $_DELY->{ $pkg } = MCE::Child::_delay->new( $chnl );
109 12         78 $_DATA->{ $pkg } = MCE::Child::_hash->new( $chnl );
110 12         358 $_DATA->{"$pkg:seed"} = int(rand() * 1e9);
111 12         59 $_DATA->{"$pkg:id" } = 0;
112              
113 12 0 33     48 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
114             }
115              
116 19 50       55 if ( !exists $mngd->{posix_exit} ) {
117             $mngd->{posix_exit} = 1 if (
118             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
119             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
120             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
121             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
122 19 50 33     749 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
123             );
124             }
125              
126 19 100       55 if ( defined $mngd->{max_workers} ) {
127 3         8 $mngd->{max_workers} = _max_workers($mngd->{max_workers});
128             }
129              
130 19 50 33     51 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
131 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
132             }
133              
134             require POSIX
135 19 50 66     96 if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 );
      33        
136              
137 19         47 return;
138             }
139              
140             ###############################################################################
141             ## ----------------------------------------------------------------------------
142             ## 'new', 'mce_child', and 'create' for threads-like similarity.
143             ##
144             ###############################################################################
145              
146             ## 'new' and 'tid' are aliases for 'create' and 'pid' respectively.
147              
148             *new = \&create, *tid = \&pid;
149              
150             ## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2)
151             ## Tip found in threads::async.
152              
153             sub mce_child (&;@) {
154 0     0 1 0 goto &create;
155             }
156              
157             sub create {
158 65   66 65 1 17291 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
159             # construct mngd internally on first use unless defined
160             init(); $_MNGD->{ "$$.$_tid.".caller() };
161             };
162              
163 65 50       442 shift if ( $_[0] eq __PACKAGE__ );
164              
165             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
166              
167 65 50       583 my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__;
  0         0  
168              
169 65 50       272 $self->{IGNORE} = 1 if $SIG{CHLD} eq 'IGNORE';
170 65         899 $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG};
171 65 50 33     444 $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' );
172              
173 65 0 33     118 my $func = shift; $func = caller().'::'.$func
  65   33     371  
174             if ( !ref $func && length $func && index($func,':') < 0 );
175              
176 65 50       152 if ( !defined $func ) {
177 0         0 local $\; print {*STDERR} "code function is not specified or valid\n";
  0         0  
  0         0  
178 0         0 return undef;
179             }
180              
181             my ( $list, $max_workers, $pkg ) = (
182             $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG}
183 65         191 );
184              
185 65 50       459 $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) >= 2e9 );
186              
187             {
188             # Reap completed child processes.
189 65         107 local ($SIG{CHLD}, $!, $?, $_); map {
  65         2470  
190 2         2327656 waitpid($_, 0); _reap_child($list->del($_), 0); ();
  2         50  
  2         50  
191             }
192 65         761 $_DATA->{$pkg}->reap_data;
193             }
194              
195 65 50 33     833 if ( $max_workers || $self->{IGNORE} ) {
196             # Wait for a slot if saturated.
197 0 0 0     0 if ( $max_workers && $list->len() >= $max_workers ) {
198 0         0 my $count = $list->len() - $max_workers + 1;
199 0         0 _wait_one($pkg) for 1 .. $count;
200             }
201             }
202              
203             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
204              
205 65 0 33     168 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
206              
207 65         148 my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages
  65         84  
208 65         88 my ( $killed, $pid );
209              
210             {
211 65     0   99 local $SIG{TERM} = local $SIG{INT} = sub { $killed = $_[0] }
  0         0  
212 65 50 33     2152 if ( !$_is_MSWin32 && $] ge '5.010001' );
213              
214             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
215 65 50       2139 if ( !$_is_MSWin32 );
216              
217 65         55340 $pid = fork();
218              
219 65 50       3196 if ( !defined $pid ) { # error
    100          
220 0         0 local $\; print {*STDERR} "fork error: $!\n";
  0         0  
  0         0  
221             }
222             elsif ( $pid ) { # parent
223 55         1462 $self->{WRK_ID} = $pid;
224 55         1754 $list->set($pid, $self);
225 55 100       6078 $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start};
226             }
227             else { # child
228 10         498 %{ $_LIST } = (), $_SELF = $self;
  10         1303  
229              
230             local $SIG{TERM} = local $SIG{INT} = local $SIG{ABRT} = \&_trap,
231             local $SIG{SEGV} = local $SIG{HUP} = \&_trap,
232 10         1469 local $SIG{QUIT} = \&_quit;
233 10         166 local $SIG{CHLD};
234              
235 10 50       317 MCE::Shared::init() if $INC{'MCE/Shared.pm'};
236 10 50       1331 $_DATA->{ $_SELF->{PKG} }->set('S'.$$, '') unless $self->{IGNORE};
237 10 50       57 CORE::kill($killed, $$) if $killed;
238              
239             # Sets the seed of the base generator uniquely between workers.
240             # The new seed is computed using the current seed and ID value.
241             # One may set the seed at the application level for predictable
242             # results. Ditto for Math::Prime::Util, Math::Random, and
243             # Math::Random::MT::Auto.
244              
245 10         195 srand( abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560 );
246              
247 10 50       109 if ( $INC{'Math/Prime/Util.pm'} ) {
248             Math::Prime::Util::srand(
249 0         0 abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560
250             );
251             }
252              
253 10 50       107 if ( $INC{'Math/Random.pm'} ) {
254 0         0 my $cur_seed = Math::Random::random_get_seed();
255 0 0       0 my $new_seed = ($cur_seed < 1073741781)
256             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
257             : $cur_seed - ((abs($id) * 10000) % 1073741780);
258              
259 0         0 Math::Random::random_set_seed($new_seed, $new_seed);
260             }
261              
262 10 50       76 if ( $INC{'Math/Random/MT/Auto.pm'} ) {
263 0         0 my $cur_seed = Math::Random::MT::Auto::get_seed()->[0];
264 0 0       0 my $new_seed = ($cur_seed < 1073741781)
265             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
266             : $cur_seed - ((abs($id) * 10000) % 1073741780);
267              
268 0         0 Math::Random::MT::Auto::set_seed($new_seed);
269             }
270              
271 10         500 _dispatch($mngd, $func, \@args);
272             }
273             }
274              
275 55 0 33     1831 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
276              
277 55 50       208 CORE::kill($killed, $$) if $killed;
278              
279 55 50       3939 return $pid ? $self : undef;
280             }
281              
282             ###############################################################################
283             ## ----------------------------------------------------------------------------
284             ## Public methods.
285             ##
286             ###############################################################################
287              
288             sub equal {
289 0 0 0 0 1 0 return 0 unless ( ref $_[0] && ref $_[1] );
290 0 0       0 $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0;
291             }
292              
293             sub error {
294 31 50   31 1 651 _croak('Usage: $child->error()') unless ref( my $self = $_[0] );
295 31 50       117 $self->join() unless $self->{REAPED};
296 31 50       289 $self->{ERROR} || undef;
297             }
298              
299             sub exit {
300 10 50 33 10 1 635 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
301              
302 10 50       205 my ( $self ) = ( ref $_[0] ? shift : $_SELF );
303 10         65 my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} );
304              
305 10 50 33     160 if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) {
    50          
306 0         0 MCE::Child->finish('MCE'); CORE::exit(@_);
  0         0  
307             }
308             elsif ( $wrk_id == $$ ) {
309 0   0     0 alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0;
  0         0  
  0         0  
310 0 0       0 $_DATA->{$pkg}->set('R'.$wrk_id, @res ? \@res : '');
311 0         0 die "Child exited ($?)\n";
312 0         0 _exit($?); # not reached
313             }
314              
315 10 50       100 return $self if $self->{REAPED};
316              
317 10 50       25 if ( exists $_DATA->{$pkg} ) {
318 10         175 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
319             } else {
320 0         0 sleep 0.030;
321             }
322              
323 10 50       25 if ($_is_MSWin32) {
324 0 0       0 CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id);
325             } else {
326 10 50       375 CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
327             }
328              
329 10         125 $self;
330             }
331              
332             sub finish {
333 26 50   26 1 3349 _croak('Usage: MCE::Child->finish()') if ref($_[0]);
334 26 50 33     286 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
335              
336 26 100       164 my $pkg = defined($_[0]) ? $_[0] : caller();
337              
338 26 100       192 if ( $pkg eq 'MCE' ) {
    100          
339 23         37 for my $key ( keys %{ $_LIST } ) { MCE::Child->finish($key); }
  23         218  
  2         21  
340             }
341             elsif ( exists $_LIST->{$pkg} ) {
342 2 50       6 return if $MCE::Signal::KILLED;
343              
344 2 50       6 if ( exists $_DELY->{$pkg} ) {
345 2         13 &_force_reap($pkg);
346             delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}),
347             delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}),
348 2         114 delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg });
349             }
350             }
351              
352 26         184 @_ = ();
353              
354 26         1763 return;
355             }
356              
357             sub is_joinable {
358 24 50   24 1 80 _croak('Usage: $child->is_joinable()') unless ref( my $self = $_[0] );
359 24         56 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
360              
361 24 50       120 if ( $wrk_id == $$ ) {
    50          
362 0         0 '';
363             }
364             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
365 24 50       48 return '' if $self->{REAPED};
366 24         64 local $!; $_DATA->{$pkg}->reap_data;
  24         64  
367 24 50       336 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do {
368 0 0       0 _reap_child($self, 0) unless $self->{REAPED};
369 0         0 1;
370             };
371             }
372             else {
373             # limitation for MCE::Child only; allowed for MCE::Hobo
374 0         0 _croak('Error: $child->is_joinable() not called by managed process');
375             }
376             }
377              
378             sub is_running {
379 24 50   24 1 10272 _croak('Usage: $child->is_running()') unless ref( my $self = $_[0] );
380 24         64 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
381              
382 24 50       120 if ( $wrk_id == $$ ) {
    50          
383 0         0 1;
384             }
385             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
386 24 50       64 return '' if $self->{REAPED};
387 24         56 local $!; $_DATA->{$pkg}->reap_data;
  24         80  
388 24 50       296 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do {
389 0 0       0 _reap_child($self, 0) unless $self->{REAPED};
390 0         0 '';
391             };
392             }
393             else {
394             # limitation for MCE::Child only; allowed for MCE::Hobo
395 0         0 _croak('Error: $child->is_running() not called by managed process');
396             }
397             }
398              
399             sub join {
400 41 50   41 1 18041 _croak('Usage: $child->join()') unless ref( my $self = $_[0] );
401 41         254 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
402              
403 41 50       134 if ( $self->{REAPED} ) {
404 0 0       0 _croak('Child already joined') unless exists( $self->{RESULT} );
405 0 0       0 $_LIST->{$pkg}->del($wrk_id) if ( exists $_LIST->{$pkg} );
406              
407             return ( defined wantarray )
408 0 0       0 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 0       0  
409             : ();
410             }
411              
412 41 50       281 if ( $wrk_id == $$ ) {
    50          
413 0         0 _croak('Cannot join self');
414             }
415             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
416             # remove from list after reaping
417 41         554 local $SIG{CHLD};
418 41         391 _reap_child($self, 1);
419 41         260 $_LIST->{$pkg}->del($wrk_id);
420             }
421             else {
422             # limitation for MCE::Child only; allowed for MCE::Hobo
423 0         0 _croak('Error: $child->join() not called by managed process');
424             }
425              
426 41 50       682 return unless ( exists $self->{RESULT} );
427              
428             ( defined wantarray )
429 41 50       472 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 100       0  
430             : ();
431             }
432              
433             sub kill {
434 5 50   5 1 90 _croak('Usage: $child->kill()') unless ref( my $self = $_[0] );
435 5         185 my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] );
436              
437 5 50       60 if ( $wrk_id == $$ ) {
438 0   0     0 CORE::kill($signal || 'INT', $$);
439 0         0 return $self;
440             }
441 5 50       65 if ( $self->{MGR_ID} eq "$$.$_tid" ) {
442 5 50       60 return $self if $self->{REAPED};
443 5 50       25 if ( exists $_DATA->{$pkg} ) {
444 5         30 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
445             } else {
446 0         0 sleep 0.030;
447             }
448             }
449              
450 5 50 50     280 CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
451              
452 5         30 $self;
453             }
454              
455             sub list {
456 8 50   8 1 5160 _croak('Usage: MCE::Child->list()') if ref($_[0]);
457 8         48 my $pkg = "$$.$_tid.".caller();
458              
459 8 50       48 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : ();
460             }
461              
462             sub list_pids {
463 8 50   8 1 472 _croak('Usage: MCE::Child->list_pids()') if ref($_[0]);
464 8         248 my $pkg = "$$.$_tid.".caller(); local $_;
  8         744  
465              
466 8 50       264 ( exists $_LIST->{$pkg} ) ? map { $_->pid } $_LIST->{$pkg}->vals() : ();
  24         240  
467             }
468              
469             sub list_joinable {
470 8 50   8 1 7672 _croak('Usage: MCE::Child->list_joinable()') if ref($_[0]);
471 8         56 my $pkg = "$$.$_tid.".caller();
472              
473 8 50       32 return () unless ( my $list = $_LIST->{$pkg} );
474 8         32 local ($!, $?, $_);
475              
476 8         24 $_DATA->{$pkg}->reap_data;
477              
478             map {
479 8 50       56 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do {
  24         184  
480 0 0       0 _reap_child($_, 0) unless $_->{REAPED};
481 0         0 $_;
482             };
483             }
484             $list->vals();
485             }
486              
487             sub list_running {
488 8 50   8 1 16880 _croak('Usage: MCE::Child->list_running()') if ref($_[0]);
489 8         56 my $pkg = "$$.$_tid.".caller();
490              
491 8 50       96 return () unless ( my $list = $_LIST->{$pkg} );
492 8         200 local ($!, $?, $_);
493              
494 8         184 $_DATA->{$pkg}->reap_data;
495              
496             map {
497 8 50       32 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do {
  24         216  
498 0 0       0 _reap_child($_, 0) unless $_->{REAPED};
499 0         0 ();
500             };
501             }
502             $list->vals();
503             }
504              
505             sub max_workers {
506 17 50   17 1 48 _croak('Usage: MCE::Child->max_workers()') if ref($_[0]);
507 17   33     64 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
508             # construct mngd internally on first use unless defined
509             init(); $_MNGD->{ "$$.$_tid.".caller() };
510             };
511 17 50       30 shift if ( $_[0] eq __PACKAGE__ );
512              
513 17 100       36 $mngd->{max_workers} = _max_workers(shift) if @_;
514 17         49 $mngd->{max_workers};
515             }
516              
517             sub pending {
518 8 50   8 1 5064 _croak('Usage: MCE::Child->pending()') if ref($_[0]);
519 8         40 my $pkg = "$$.$_tid.".caller();
520              
521 8 50       40 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0;
522             }
523              
524             sub pid {
525 31 50   31 1 158 ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID};
526             }
527              
528             sub result {
529 7 50   7 1 89 _croak('Usage: $child->result()') unless ref( my $self = $_[0] );
530 7 50       42 return $self->join() unless $self->{REAPED};
531              
532 7 50       44 _croak('Child already joined') unless exists( $self->{RESULT} );
533 7 50       50 wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1];
  0         0  
534             }
535              
536             sub self {
537 0 0   0 1 0 ref($_[0]) ? $_[0] : $_SELF;
538             }
539              
540             sub wait_all {
541 1 50   1 1 48 _croak('Usage: MCE::Child->wait_all()') if ref($_[0]);
542 1         37 my $pkg = "$$.$_tid.".caller();
543              
544             return wantarray ? () : 0
545 1 0 33     33 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
    50          
546              
547 1         25 local $_; ( wantarray )
548 0         0 ? map { $_->join(); $_ } $_LIST->{$pkg}->vals()
  0         0  
549 1 50       36 : map { $_->join(); () } $_LIST->{$pkg}->vals();
  2         31  
  2         9  
550             }
551              
552             *waitall = \&wait_all; # compatibility
553              
554             sub wait_one {
555 4 50   4 1 200 _croak('Usage: MCE::Child->wait_one()') if ref($_[0]);
556 4         188 my $pkg = "$$.$_tid.".caller();
557              
558             return undef
559 4 50 33     124 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
560              
561 4         160 _wait_one($pkg);
562             }
563              
564             *waitone = \&wait_one; # compatibility
565              
566             sub yield {
567 0 0   0 1 0 _croak('Usage: MCE::Child->yield()') if ref($_[0]);
568 0 0 0     0 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
569              
570 0   0     0 my $pkg = $_SELF->{PKG} || do {
571             my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
572             # construct mngd internally on first use unless defined
573             init(); $_MNGD->{ "$$.$_tid.".caller() };
574             };
575             $mngd->{PKG};
576             };
577              
578 0 0       0 return unless $_DELY->{$pkg};
579 0         0 my $seconds = $_DELY->{$pkg}->seconds(@_);
580              
581 0         0 MCE::Util::_sleep( $seconds );
582             }
583              
584             ###############################################################################
585             ## ----------------------------------------------------------------------------
586             ## Private methods.
587             ##
588             ###############################################################################
589              
590             sub _croak {
591 0 0   0   0 if ( $INC{'MCE.pm'} ) {
592 0         0 goto &MCE::_croak;
593             }
594             else {
595 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
596 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
597              
598 0         0 $\ = undef; goto &Carp::croak;
  0         0  
599             }
600             }
601              
602             sub _dispatch {
603 10     10   51 my ( $mngd, $func, $args ) = @_;
604              
605 10         198 $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$, $? = 0;
606 10 50       76 $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32;
607              
608             {
609 10         22 local $!;
  10         177  
610 10 50       420 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
611 10 50       4199 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
612             }
613              
614             # Run task.
615             my $child_timeout = ( exists $_SELF->{child_timeout} )
616 10 50       370 ? $_SELF->{child_timeout} : $mngd->{child_timeout};
617              
618             my $void_context = ( exists $_SELF->{void_context} )
619 10 50       55 ? $_SELF->{void_context} : $mngd->{void_context};
620              
621 10         32 my @res; my $timed_out = 0;
  10         23  
622              
623             local $SIG{'ALRM'} = sub {
624 0     0   0 alarm 0; $timed_out = 1; $SIG{__WARN__} = sub {};
  0         0  
  0         0  
625 0         0 die "Child timed out\n";
626 10         616 };
627              
628 10 50 33     264 if ( $void_context || $_SELF->{IGNORE} ) {
629 12     12   85 no strict 'refs';
  12         24  
  12         625  
630 0   0     0 eval { alarm($child_timeout || 0); $func->(@{ $args }) };
  0         0  
  0         0  
  0         0  
631             }
632             else {
633 12     12   79 no strict 'refs';
  12         447  
  12         2047  
634 10   50     75 @res = eval { alarm($child_timeout || 0); $func->(@{ $args }) };
  10         672  
  10         26  
  10         158  
635             }
636              
637 7         394 alarm 0;
638 7 50       32 $@ = "Child timed out" if $timed_out;
639              
640 7 50       25 if ( $@ ) {
641 0 0       0 _exit($?) if ( $@ =~ /^Child exited \(\S+\)$/ );
642 0         0 my $err = $@; $? = 1; $err =~ s/, <__ANONIO__> line \d+//;
  0         0  
  0         0  
643              
644 0 0       0 if ( ! $_SELF->{IGNORE} ) {
645             $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err),
646 0         0 $_DATA->{ $_SELF->{PKG} }->set('R'.$$, '');
647             }
648              
649 0 0 0     0 if ( !$timed_out && !$mngd->{on_finish} && !$INC{'MCE/Simple.pm'} ) {
      0        
650 12     12   6324 use bytes; warn "Child $$ terminated abnormally: reason $err\n";
  12         167  
  12         59  
  0         0  
651             }
652             }
653             else {
654 7 50       74 shift(@res) if ref($res[0]) =~ /^MCE::(?:Barrier|Semaphore)::_guard/s;
655             $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? \@res : '')
656 7 50       178 if ( ! $_SELF->{IGNORE} );
    50          
657             }
658              
659 7         44 _exit($?);
660             }
661              
662             sub _exit {
663 10     10   73 my ( $exit_status ) = @_;
664              
665             # Check for nested workers not yet joined.
666 10 50 66     118 MCE::Child->finish('MCE') if ( !$_SELF->{SIGNALED} && keys %{ $_LIST } );
  7         48  
667              
668             # Exit child process.
669 10 50   0   286 $SIG{__DIE__} = sub {} unless $_tid;
670 10     0   210 $SIG{__WARN__} = sub {};
671              
672 10 50 33     83 threads->exit($exit_status) if ( $INC{'threads.pm'} && $_is_MSWin32 );
673              
674             my $posix_exit = ( exists $_SELF->{posix_exit} )
675 10 50       59 ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit};
676              
677 10 0 33     44 if ( $posix_exit && !$_SELF->{SIGNALED} && !$_is_MSWin32 ) {
      33        
678 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
679 0 0       0 POSIX::_exit($exit_status) if $INC{'POSIX.pm'};
680 0         0 CORE::kill('KILL', $$);
681             }
682              
683 10         3863 CORE::exit($exit_status);
684             }
685              
686             sub _force_reap {
687 9     9   34 my ( $count, $pkg ) = ( 0, @_ );
688 9 50 33     97 return unless ( exists $_LIST->{$pkg} && $_LIST->{$pkg}->len() );
689              
690 0         0 for my $child ( $_LIST->{$pkg}->vals() ) {
691 0 0       0 next if $child->{IGNORE};
692              
693 0 0       0 if ( $child->is_running() ) {
694 0 0       0 sleep(0.015), CORE::kill('KILL', $child->pid())
695             if CORE::kill('ZERO', $child->pid());
696 0         0 $count++;
697             }
698             }
699              
700 0         0 $_LIST->{$pkg}->clear();
701              
702 0 0 0     0 warn "Finished with active child processes [$pkg] ($count)\n"
703             if ( $count && !$_is_MSWin32 );
704              
705 0         0 return;
706             }
707              
708             sub _quit {
709 3 50   3   18250 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
710              
711 3         19 alarm 0; my ( $name ) = @_;
  3         14  
712 3         69 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
713              
714       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
715 3 50       156 if ( exists $SIG{$name} );
716              
717 3 50       31 if ( ! $_SELF->{IGNORE} ) {
718 3         11 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
719 3         21 $_DATA->{$pkg}->set('R'.$wrk_id, '');
720             }
721              
722 3         19 _exit(0);
723             }
724              
725             sub _reap_child {
726 47     47   147 my ( $child, $wait_flag ) = @_;
727 47 50       226 return unless $child;
728              
729 47         318 local @_ = $_DATA->{ $child->{PKG} }->get( $child->{WRK_ID}, $wait_flag );
730              
731 47 100 50     1861 ( $child->{ERROR}, $child->{RESULT}, $child->{REAPED} ) =
732             ( pop || '', length $_[0] ? pop : [], 1 );
733              
734 47 50       235 return if $child->{IGNORE};
735              
736 47   50     508 my ( $exit, $err ) = ( $? || 0, $child->{ERROR} );
737 47         144 my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f );
738              
739 47 50 33     168 if ( $code > 100 && !$err ) {
740 0 0       0 $code = 2, $sig = 1, $err = 'Child received SIGHUP' if $code == 101;
741 0 0       0 $code = 2, $sig = 2, $err = 'Child received SIGINT' if $code == 102;
742 0 0       0 $code = 2, $sig = 6, $err = 'Child received SIGABRT' if $code == 106;
743 0 0       0 $code = 2, $sig = 11, $err = 'Child received SIGSEGV' if $code == 111;
744 0 0       0 $code = 2, $sig = 15, $err = 'Child received SIGTERM' if $code == 115;
745              
746 0         0 $child->{ERROR} = $err;
747             }
748              
749 47 100       217 if ( my $on_finish = $_MNGD->{ $child->{PKG} }{on_finish} ) {
750             $on_finish->(
751             $child->{WRK_ID}, $code, $child->{ident}, $sig, $err,
752 8         36 @{ $child->{RESULT} }
  8         45  
753             );
754             }
755              
756 47         246 return;
757             }
758              
759             sub _trap {
760 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
761              
762 0         0 alarm 0; my ( $exit_status, $name ) = ( 2, @_ );
  0         0  
763 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
764              
765       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
766 0 0       0 if ( exists $SIG{$name} );
767              
768 0 0       0 if ( $name eq 'HUP' ) { $exit_status = 101 }
  0 0       0  
    0          
    0          
    0          
769 0         0 elsif ( $name eq 'INT' ) { $exit_status = 102 }
770 0         0 elsif ( $name eq 'ABRT' ) { $exit_status = 106 }
771 0         0 elsif ( $name eq 'SEGV' ) { $exit_status = 111 }
772 0         0 elsif ( $name eq 'TERM' ) { $exit_status = 115 }
773              
774 0 0       0 if ( ! $_SELF->{IGNORE} ) {
775 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
776 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
777             }
778              
779 0         0 _exit($exit_status);
780             }
781              
782             sub _wait_one {
783 4     4   40 my ( $pkg ) = @_;
784 4         40 my ( $list, $self, $wrk_id ) = ( $_LIST->{$pkg} ); local $!;
  4         148  
785              
786 4         48 while () {
787 160         1908 $_DATA->{$pkg}->reap_data;
788 160         760 for my $child ( $list->vals() ) {
789 160         488 $wrk_id = $child->{WRK_ID};
790 160 50       384 return $list->del($wrk_id) if $child->{REAPED};
791 160 100       2768 $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG);
792             }
793 160 100       520 last if $self;
794 156         4697056 sleep 0.030;
795             }
796              
797 4         60 _reap_child($self, 0);
798              
799 4         24 $self;
800             }
801              
802             ###############################################################################
803             ## ----------------------------------------------------------------------------
804             ## Delay implementation suited for MCE::Child.
805             ##
806             ###############################################################################
807              
808             package # hide from rpm
809             MCE::Child::_delay;
810              
811             sub new {
812 12     12   37 my ( $class, $chnl, $delay ) = @_;
813              
814 12 50       36 if ( !defined $delay ) {
815 12 50       72 $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008;
816             }
817              
818 12         48 $chnl->send(undef);
819              
820 12         59 bless [ $delay, $chnl ], $class;
821             }
822              
823             sub seconds {
824 0     0   0 my ( $self, $how_long ) = @_;
825 0 0       0 my $delay = defined($how_long) ? $how_long : $self->[0];
826 0         0 my $lapse = $self->[1]->recv();
827 0         0 my $time = MCE::Util::_time();
828              
829 0 0 0     0 if ( !$delay || !defined $lapse ) {
    0          
830 0         0 $lapse = $time;
831             }
832             elsif ( $lapse + $delay - $time < 0 ) {
833 0         0 $lapse += int( abs($time - $lapse) / $delay + 0.5 ) * $delay;
834             }
835              
836 0         0 $self->[1]->send( $lapse += $delay );
837              
838 0         0 return $lapse - $time;
839             }
840              
841             ###############################################################################
842             ## ----------------------------------------------------------------------------
843             ## Hash and ordhash implementations suited for MCE::Child.
844             ##
845             ###############################################################################
846              
847             package # hide from rpm
848             MCE::Child::_hash;
849              
850 12     12   15889 use Time::HiRes 'sleep';
  12         23  
  12         74  
851              
852             use constant {
853 12 0       12172 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
854             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
855 12     12   1531 };
  12         24  
856              
857             sub new {
858 12     12   47 my ( $class, $chnl ) = @_;
859              
860 12         38 bless [ {}, $chnl ], shift;
861             }
862              
863             sub clear {
864 7     7   21 my ( $self ) = @_;
865              
866 7         30 1 while ( $self->[1]->recv2_nb() );
867              
868 7         18 %{ $self->[0] } = ();
  7         25  
869             }
870              
871             sub exists {
872 20     20   130 my ( $self, $key ) = @_;
873              
874 20         785 while ( my $data = $self->[1]->recv2_nb() ) {
875 20         165 $self->[0]{ $data->[0] } = $data->[1];
876             }
877              
878 20         75710 CORE::exists $self->[0]{ $key };
879             }
880              
881             sub get {
882 47     47   133 my ( $self, $wrk_id, $wait_flag ) = @_;
883              
884 47 100       248 if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) {
885 23         123 while ( my $data = $self->[1]->recv2_nb() ) {
886 16         204 $self->[0]{ $data->[0] } = $data->[1];
887             }
888             }
889              
890 47 100       110450 if ( $wait_flag ) {
891 41         554 local $!;
892 41 100       8162675 ( CORE::exists $self->[0]{ 'R'.$wrk_id } ) ? waitpid($wrk_id, 0) : do {
893 9         26 while () {
894 18         82 my $data = $self->[1]->recv2_nb();
895 18 100       70 if ( !defined $data ) {
896 1 50       9 last if waitpid($wrk_id, _WNOHANG);
897 1         1008 sleep(0.0009), next;
898             }
899 17         51 $self->[0]{ $data->[0] } = $data->[1];
900 17 100       10420255 waitpid($wrk_id, 0), last if $data->[0] eq 'R'.$wrk_id;
901             }
902 9 50       288 if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) {
903 0         0 while ( my $data = $self->[1]->recv2_nb() ) {
904 0         0 $self->[0]{ $data->[0] } = $data->[1];
905             }
906             }
907             };
908             }
909              
910 47         464 my $result = delete $self->[0]{ 'R'.$wrk_id };
911 47         311 my $error = delete $self->[0]{ 'S'.$wrk_id };
912              
913 47 50       170 $result = '' unless ( defined $result );
914 47 50       131 $error = '' unless ( defined $error );
915              
916 47         252 return ( $result, $error );
917             }
918              
919             sub reap_data {
920 289     289   713 my ( $self ) = @_;
921              
922 289 100       932 if (wantarray) {
923 65         95 my @ret;
924 65         884 while ( my $data = $self->[1]->recv2_nb() ) {
925 19 100       141 push @ret, substr($data->[0], 1) if substr($data->[0], 0, 1) eq 'R';
926 19         251 $self->[0]{ $data->[0] } = $data->[1];
927             }
928 65         926 return @ret;
929             }
930              
931 224         1872 while ( my $data = $self->[1]->recv2_nb() ) {
932 24         636 $self->[0]{ $data->[0] } = $data->[1];
933             }
934              
935 224         460 return;
936             }
937              
938             sub set {
939 20     20   718 $_[0]->[1]->send2([ $_[1], $_[2] ]);
940             }
941              
942             package # hide from rpm
943             MCE::Child::_ordhash;
944              
945 12     12   59 sub new { bless [ {}, [], {}, 0 ], shift; } # data, keys, indx, gcnt
946 0     0   0 sub exists { CORE::exists $_[0]->[0]{ $_[1] }; }
947 0     0   0 sub get { $_[0]->[0]{ $_[1] }; }
948 22     22   113 sub len { scalar keys %{ $_[0]->[0] }; }
  22         205  
949              
950             sub clear {
951 0     0   0 my ( $self ) = @_;
952 0         0 %{ $self->[0] } = @{ $self->[1] } = %{ $self->[2] } = (), $self->[3] = 0;
  0         0  
  0         0  
  0         0  
953              
954 0         0 return;
955             }
956              
957             sub del {
958 47     47   135 my ( $self, $key ) = @_;
959 47 50       262 return undef unless defined( my $off = delete $self->[2]{$key} );
960              
961             # tombstone
962 47         116 $self->[1][$off] = undef;
963              
964             # GC keys and refresh index
965 47 100       87 if ( ++$self->[3] > @{ $self->[1] } * 0.667 ) {
  47         231  
966 18         69 my ( $keys, $indx ) = ( $self->[1], $self->[2] );
967 18         35 my $i; $i = $self->[3] = 0;
  18         36  
968 18         96 for my $k ( @{ $keys } ) {
  18         118  
969 46 50       121 $keys->[$i] = $k, $indx->{$k} = $i++ if defined($k);
970             }
971 18         39 splice @{ $keys }, $i;
  18         63  
972             }
973              
974 47         867 delete $self->[0]{$key};
975             }
976              
977             sub set {
978 55     55   937 my ( $self, $key ) = @_;
979 55 50       684 $self->[0]{$key} = $_[2], return 1 if exists($self->[0]{$key});
980              
981 55         128 $self->[2]{$key} = @{ $self->[1] }; push @{ $self->[1] }, $key;
  55         1739  
  55         150  
  55         742  
982 55         613 $self->[0]{$key} = $_[2];
983              
984 55         197 return 1;
985             }
986              
987             sub vals {
988 193     193   447 my ( $self ) = @_;
989             $self->[3]
990 1         8 ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } }
  1         17  
991 193 100       649 : @{ $self->[0] }{ @{ $self->[1] } };
  192         892  
  192         368  
992             }
993              
994             1;
995              
996             __END__
997              
998             ###############################################################################
999             ## ----------------------------------------------------------------------------
1000             ## Module usage.
1001             ##
1002             ###############################################################################
1003              
1004             =head1 NAME
1005              
1006             MCE::Child - A threads-like parallelization module compatible with Perl 5.8
1007              
1008             =head1 VERSION
1009              
1010             This document describes MCE::Child version 1.887
1011              
1012             =head1 SYNOPSIS
1013              
1014             use MCE::Child;
1015              
1016             MCE::Child->init(
1017             max_workers => 'auto', # default undef, unlimited
1018              
1019             # Specify a percentage. MCE::Child 1.876+.
1020             max_workers => '25%', # 4 on HW with 16 lcores
1021             max_workers => '50%', # 8 on HW with 16 lcores
1022              
1023             child_timeout => 20, # default undef, no timeout
1024             posix_exit => 1, # default undef, CORE::exit
1025             void_context => 1, # default undef
1026              
1027             on_start => sub {
1028             my ( $pid, $ident ) = @_;
1029             ...
1030             },
1031             on_finish => sub {
1032             my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
1033             ...
1034             }
1035             );
1036              
1037             MCE::Child->create( sub { print "Hello from child\n" } )->join();
1038              
1039             sub parallel {
1040             my ($arg1) = @_;
1041             print "Hello again, $arg1\n" if defined($arg1);
1042             print "Hello again, $_\n"; # same thing
1043             }
1044              
1045             MCE::Child->create( \&parallel, $_ ) for 1 .. 3;
1046              
1047             my @procs = MCE::Child->list();
1048             my @pids = MCE::Child->list_pids();
1049             my @running = MCE::Child->list_running();
1050             my @joinable = MCE::Child->list_joinable();
1051             my @count = MCE::Child->pending();
1052              
1053             # Joining is orderly, e.g. child1 is joined first, child2, child3.
1054             $_->join() for @procs; # (or)
1055             $_->join() for @joinable;
1056              
1057             # Joining occurs immediately as child processes complete execution.
1058             1 while MCE::Child->wait_one();
1059              
1060             my $child = mce_child { foreach (@files) { ... } };
1061              
1062             $child->join();
1063              
1064             if ( my $err = $child->error() ) {
1065             warn "Child error: $err\n";
1066             }
1067              
1068             # Get a child's object
1069             $child = MCE::Child->self();
1070              
1071             # Get a child's ID
1072             $pid = MCE::Child->pid(); # $$
1073             $pid = $child->pid();
1074             $pid = MCE::Child->tid(); # tid is an alias for pid
1075             $pid = $child->tid();
1076              
1077             # Test child objects
1078             if ( $child1 == $child2 ) {
1079             ...
1080             }
1081              
1082             # Give other workers a chance to run
1083             MCE::Child->yield();
1084             MCE::Child->yield(0.05);
1085              
1086             # Return context, wantarray aware
1087             my ($value1, $value2) = $child->join();
1088             my $value = $child->join();
1089              
1090             # Check child's state
1091             if ( $child->is_running() ) {
1092             sleep 1;
1093             }
1094             if ( $child->is_joinable() ) {
1095             $child->join();
1096             }
1097              
1098             # Send a signal to a child
1099             $child->kill('SIGUSR1');
1100              
1101             # Exit a child
1102             MCE::Child->exit(0);
1103             MCE::Child->exit(0, @ret);
1104              
1105             =head1 DESCRIPTION
1106              
1107             L<MCE::Child> is a fork of L<MCE::Hobo> for compatibility with Perl 5.8.
1108              
1109             A child is a migratory worker inside the machine that carries the asynchronous
1110             gene. Child processes are equipped with C<threads>-like capability for running
1111             code asynchronously. Unlike threads, each child is a unique process to the
1112             underlying OS. The IPC is handled via C<MCE::Channel>, which runs on all the
1113             major platforms including Cygwin and Strawberry Perl.
1114              
1115             C<MCE::Child> may be used as a standalone or together with C<MCE> including
1116             running alongside C<threads>.
1117              
1118             use MCE::Child;
1119             use MCE::Shared;
1120              
1121             # synopsis: head -20 file.txt | perl script.pl
1122              
1123             my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
1124             my $ofh = MCE::Shared->handle( ">", \*STDOUT );
1125             my $ary = MCE::Shared->array();
1126              
1127             sub parallel_task {
1128             my ( $id ) = @_;
1129             while ( <$ifh> ) {
1130             printf {$ofh} "[ %4d ] %s", $., $_;
1131             # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
1132             $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
1133             }
1134             }
1135              
1136             my $child1 = MCE::Child->new( "parallel_task", 1 );
1137             my $child2 = MCE::Child->new( \&parallel_task, 2 );
1138             my $child3 = MCE::Child->new( sub { parallel_task(3) } );
1139              
1140             $_->join for MCE::Child->list(); # ditto: MCE::Child->wait_all();
1141              
1142             # search array (total one round-trip via IPC)
1143             my @vals = $ary->vals( "val =~ / ID 2 /" );
1144              
1145             print {*STDERR} join("", @vals);
1146              
1147             =head1 API DOCUMENTATION
1148              
1149             =over 3
1150              
1151             =item $child = MCE::Child->create( FUNCTION, ARGS )
1152              
1153             =item $child = MCE::Child->new( FUNCTION, ARGS )
1154              
1155             This will create a new child process that will begin execution with function
1156             as the entry point, and optionally ARGS for list of parameters. It will return
1157             the corresponding MCE::Child object, or undef if child creation failed.
1158              
1159             I<FUNCTION> may either be the name of a function, an anonymous subroutine, or
1160             a code ref.
1161              
1162             my $child = MCE::Child->create( "func_name", ... );
1163             # or
1164             my $child = MCE::Child->create( sub { ... }, ... );
1165             # or
1166             my $child = MCE::Child->create( \&func, ... );
1167              
1168             =item $child = MCE::Child->create( { options }, FUNCTION, ARGS )
1169              
1170             =item $child = MCE::Child->create( IDENT, FUNCTION, ARGS )
1171              
1172             Options, excluding C<ident>, may be specified globally via the C<init> function.
1173             Otherwise, C<ident>, C<child_timeout>, C<posix_exit>, and C<void_context> may
1174             be set uniquely.
1175              
1176             The C<ident> option is used by callback functions C<on_start> and C<on_finish>
1177             for identifying the started and finished child process respectively.
1178              
1179             my $child1 = MCE::Child->create( { posix_exit => 1 }, sub {
1180             ...
1181             } );
1182              
1183             $child1->join;
1184              
1185             my $child2 = MCE::Child->create( { child_timeout => 3 }, sub {
1186             sleep 1 for ( 1 .. 9 );
1187             } );
1188              
1189             $child2->join;
1190              
1191             if ( $child2->error() eq "Child timed out\n" ) {
1192             ...
1193             }
1194              
1195             The C<new()> method is an alias for C<create()>.
1196              
1197             =item mce_child { BLOCK } ARGS;
1198              
1199             =item mce_child { BLOCK };
1200              
1201             C<mce_child> runs the block asynchronously similarly to C<< MCE::Child->create() >>.
1202             It returns the child object, or undef if child creation failed.
1203              
1204             my $child = mce_child { foreach (@files) { ... } };
1205              
1206             $child->join();
1207              
1208             if ( my $err = $child->error() ) {
1209             warn("Child error: $err\n");
1210             }
1211              
1212             =item $child->join()
1213              
1214             This will wait for the corresponding child process to complete its execution.
1215             In non-voided context, C<join()> will return the value(s) of the entry point
1216             function.
1217              
1218             The context (void, scalar or list) for the return value(s) for C<join> is
1219             determined at the time of joining and mostly C<wantarray> aware.
1220              
1221             my $child1 = MCE::Child->create( sub {
1222             my @res = qw(foo bar baz);
1223             return (@res);
1224             });
1225              
1226             my @res1 = $child1->join(); # ( foo, bar, baz )
1227             my $res1 = $child1->join(); # baz
1228              
1229             my $child2 = MCE::Child->create( sub {
1230             return 'foo';
1231             });
1232              
1233             my @res2 = $child2->join(); # ( foo )
1234             my $res2 = $child2->join(); # foo
1235              
1236             =item $child1->equal( $child2 )
1237              
1238             Tests if two child objects are the same child or not. Child comparison is based
1239             on process IDs. This is overloaded to the more natural forms.
1240              
1241             if ( $child1 == $child2 ) {
1242             print("Child objects are the same\n");
1243             }
1244             # or
1245             if ( $child1 != $child2 ) {
1246             print("Child objects differ\n");
1247             }
1248              
1249             =item $child->error()
1250              
1251             Child processes are executed in an C<eval> context. This method will return
1252             C<undef> if the child terminates I<normally>. Otherwise, it returns the value
1253             of C<$@> associated with the child's execution status in its C<eval> context.
1254              
1255             =item $child->exit()
1256              
1257             This sends C<'SIGQUIT'> to the child process, notifying the child to exit.
1258             It returns the child object to allow for method chaining. It is important to
1259             join later if not immediately to not leave a zombie or defunct process.
1260              
1261             $child->exit()->join();
1262             ...
1263              
1264             $child->join(); # later
1265              
1266             =item MCE::Child->exit( 0 )
1267              
1268             =item MCE::Child->exit( 0, @ret )
1269              
1270             A child can exit at any time by calling C<< MCE::Child->exit() >>.
1271             Otherwise, the behavior is the same as C<exit(status)> when called from
1272             the main process. The child process may optionally return data, to be
1273             sent via IPC.
1274              
1275             =item MCE::Child->finish()
1276              
1277             This class method is called automatically by C<END>, but may be called
1278             explicitly. An error is emitted via croak if there are active child
1279             processes not yet joined.
1280              
1281             MCE::Child->create( 'task1', $_ ) for 1 .. 4;
1282             $_->join for MCE::Child->list();
1283              
1284             MCE::Child->create( 'task2', $_ ) for 1 .. 4;
1285             $_->join for MCE::Child->list();
1286              
1287             MCE::Child->create( 'task3', $_ ) for 1 .. 4;
1288             $_->join for MCE::Child->list();
1289              
1290             MCE::Child->finish();
1291              
1292             =item MCE::Child->init( options )
1293              
1294             The init function accepts a list of MCE::Child options.
1295              
1296             MCE::Child->init(
1297             max_workers => 'auto', # default undef, unlimited
1298              
1299             # Specify a percentage. MCE::Child 1.876+.
1300             max_workers => '25%', # 4 on HW with 16 lcores
1301             max_workers => '50%', # 8 on HW with 16 lcores
1302              
1303             child_timeout => 20, # default undef, no timeout
1304             posix_exit => 1, # default undef, CORE::exit
1305             void_context => 1, # default undef
1306              
1307             on_start => sub {
1308             my ( $pid, $ident ) = @_;
1309             ...
1310             },
1311             on_finish => sub {
1312             my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
1313             ...
1314             }
1315             );
1316              
1317             # Identification given as an option or the 1st argument.
1318              
1319             for my $key ( 'aa' .. 'zz' ) {
1320             MCE::Child->create( { ident => $key }, sub { ... } );
1321             MCE::Child->create( $key, sub { ... } );
1322             }
1323              
1324             MCE::Child->wait_all;
1325              
1326             Set C<max_workers> if you want to limit the number of workers by waiting
1327             automatically for an available slot. Specify a percentage or C<auto> to
1328             obtain the number of logical cores via C<MCE::Util::get_ncpu()>.
1329              
1330             Set C<child_timeout>, in number of seconds, if you want the child process
1331             to terminate after some time. The default is C<0> for no timeout.
1332              
1333             Set C<posix_exit> to avoid all END and destructor processing. Constructing
1334             MCE::Child inside a thread implies 1 or if present CGI, FCGI, Coro, Curses,
1335             Gearman::Util, Gearman::XS, LWP::UserAgent, Mojo::IOLoop, STFL, Tk, Wx,
1336             or Win32::GUI.
1337              
1338             Set C<void_context> to create the child process in void context for the
1339             return value. Otherwise, the return context is wantarray-aware for
1340             C<join()> and C<result()> and determined when retrieving the data.
1341              
1342             The callback options C<on_start> and C<on_finish> are called in the parent
1343             process after starting the worker and later when terminated. The arguments
1344             for the subroutines were inspired by L<Parallel::ForkManager>.
1345              
1346             The parameters for C<on_start> are the following:
1347              
1348             - pid of the child process
1349             - identification (ident option or 1st arg to create)
1350              
1351             The parameters for C<on_finish> are the following:
1352              
1353             - pid of the child process
1354             - program exit code
1355             - identification (ident option or 1st arg to create)
1356             - exit signal id
1357             - error message from eval inside MCE::Child
1358             - returned data
1359              
1360             =item $child->is_running()
1361              
1362             Returns true if a child is still running.
1363              
1364             =item $child->is_joinable()
1365              
1366             Returns true if the child has finished running and not yet joined.
1367              
1368             =item $child->kill( 'SIG...' )
1369              
1370             Sends the specified signal to the child. Returns the child object to allow for
1371             method chaining. As with C<exit>, it is important to join eventually if not
1372             immediately to not leave a zombie or defunct process.
1373              
1374             $child->kill('SIG...')->join();
1375              
1376             The following is a parallel demonstration comparing C<MCE::Shared> against
1377             C<Redis> and C<Redis::Fast> on a Fedora 23 VM. Joining begins after all
1378             workers have been notified to quit.
1379              
1380             use Time::HiRes qw(time);
1381              
1382             use Redis;
1383             use Redis::Fast;
1384              
1385             use MCE::Child;
1386             use MCE::Shared;
1387              
1388             my $redis = Redis->new();
1389             my $rfast = Redis::Fast->new();
1390             my $array = MCE::Shared->array();
1391              
1392             sub parallel_redis {
1393             my ($_redis) = @_;
1394             my ($count, $quit, $len) = (0, 0);
1395              
1396             # instead, use a flag to exit loop
1397             $SIG{'QUIT'} = sub { $quit = 1 };
1398              
1399             while () {
1400             $len = $_redis->rpush('list', $count++);
1401             last if $quit;
1402             }
1403              
1404             $count;
1405             }
1406              
1407             sub parallel_array {
1408             my ($count, $quit, $len) = (0, 0);
1409              
1410             # do not exit from inside handler
1411             $SIG{'QUIT'} = sub { $quit = 1 };
1412              
1413             while () {
1414             $len = $array->push($count++);
1415             last if $quit;
1416             }
1417              
1418             $count;
1419             }
1420              
1421             sub benchmark_this {
1422             my ($desc, $num_procs, $timeout, $code, @args) = @_;
1423             my ($start, $total) = (time(), 0);
1424              
1425             MCE::Child->new($code, @args) for 1..$num_procs;
1426             sleep $timeout;
1427              
1428             # joining is not immediate; ok
1429             $_->kill('QUIT') for MCE::Child->list();
1430              
1431             # joining later; ok
1432             $total += $_->join() for MCE::Child->list();
1433              
1434             printf "$desc <> duration: %0.03f secs, count: $total\n",
1435             time() - $start;
1436              
1437             sleep 0.2;
1438             }
1439              
1440             benchmark_this('Redis ', 8, 5.0, \&parallel_redis, $redis);
1441             benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
1442             benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
1443              
1444             =item MCE::Child->list()
1445              
1446             Returns a list of all child objects not yet joined.
1447              
1448             @procs = MCE::Child->list();
1449              
1450             =item MCE::Child->list_pids()
1451              
1452             Returns a list of all child pids not yet joined (available since 1.849).
1453              
1454             @pids = MCE::Child->list_pids();
1455              
1456             $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
1457             # Signal workers all at once
1458             CORE::kill('KILL', MCE::Child->list_pids());
1459             exec('reset');
1460             };
1461              
1462             =item MCE::Child->list_running()
1463              
1464             Returns a list of all child objects that are still running.
1465              
1466             @procs = MCE::Child->list_running();
1467              
1468             =item MCE::Child->list_joinable()
1469              
1470             Returns a list of all child objects that have completed running.
1471             Thus, ready to be joined without blocking.
1472              
1473             @procs = MCE::Child->list_joinable();
1474              
1475             =item MCE::Child->max_workers([ N ])
1476              
1477             Getter and setter for max_workers. Specify a number or 'auto' to acquire the
1478             total number of cores via MCE::Util::get_ncpu. Specify a false value to set
1479             back to no limit.
1480              
1481             =item MCE::Child->pending()
1482              
1483             Returns a count of all child objects not yet joined.
1484              
1485             $count = MCE::Child->pending();
1486              
1487             =item $child->result()
1488              
1489             Returns the result obtained by C<join>, C<wait_one>, or C<wait_all>. If the
1490             process has not yet exited, waits for the corresponding child to complete its
1491             execution.
1492              
1493             use MCE::Child;
1494             use Time::HiRes qw(sleep);
1495              
1496             sub task {
1497             my ($id) = @_;
1498             sleep $id * 0.333;
1499             return $id;
1500             }
1501              
1502             MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
1503              
1504             # 1 while MCE::Child->wait_one();
1505              
1506             while ( my $child = MCE::Child->wait_one() ) {
1507             my $err = $child->error() || 'no error';
1508             my $res = $child->result();
1509             my $pid = $child->pid();
1510              
1511             print "[$pid] $err : $res\n";
1512             }
1513              
1514             Like C<join> described above, the context (void, scalar or list) for the
1515             return value(s) is determined at the time C<result> is called and mostly
1516             C<wantarray> aware.
1517              
1518             my $child1 = MCE::Child->create( sub {
1519             my @res = qw(foo bar baz);
1520             return (@res);
1521             });
1522              
1523             my @res1 = $child1->result(); # ( foo, bar, baz )
1524             my $res1 = $child1->result(); # baz
1525              
1526             my $child2 = MCE::Child->create( sub {
1527             return 'foo';
1528             });
1529              
1530             my @res2 = $child2->result(); # ( foo )
1531             my $res2 = $child2->result(); # foo
1532              
1533             =item MCE::Child->self()
1534              
1535             Class method that allows a child to obtain it's own I<MCE::Child> object.
1536              
1537             =item $child->pid()
1538              
1539             =item $child->tid()
1540              
1541             Returns the ID of the child.
1542              
1543             pid: $$ process id
1544             tid: $$ alias for pid
1545              
1546             =item MCE::Child->pid()
1547              
1548             =item MCE::Child->tid()
1549              
1550             Class methods that allows a child to obtain its own ID.
1551              
1552             pid: $$ process id
1553             tid: $$ alias for pid
1554              
1555             =item MCE::Child->wait_one()
1556              
1557             =item MCE::Child->waitone()
1558              
1559             =item MCE::Child->wait_all()
1560              
1561             =item MCE::Child->waitall()
1562              
1563             Meaningful for the manager process only, waits for one or all child processes
1564             to complete execution. Afterwards, returns the corresponding child objects.
1565             If a child doesn't exist, returns the C<undef> value or an empty list for
1566             C<wait_one> and C<wait_all> respectively.
1567              
1568             The C<waitone> and C<waitall> methods are aliases for compatibility with
1569             C<MCE::Hobo>.
1570              
1571             use MCE::Child;
1572             use Time::HiRes qw(sleep);
1573              
1574             sub task {
1575             my $id = shift;
1576             sleep $id * 0.333;
1577             return $id;
1578             }
1579              
1580             MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
1581              
1582             # join, traditional use case
1583             $_->join() for MCE::Child->list();
1584              
1585             # wait_one, simplistic use case
1586             1 while MCE::Child->wait_one();
1587              
1588             # wait_one
1589             while ( my $child = MCE::Child->wait_one() ) {
1590             my $err = $child->error() || 'no error';
1591             my $res = $child->result();
1592             my $pid = $child->pid();
1593              
1594             print "[$pid] $err : $res\n";
1595             }
1596              
1597             # wait_all
1598             my @procs = MCE::Child->wait_all();
1599              
1600             for ( @procs ) {
1601             my $err = $_->error() || 'no error';
1602             my $res = $_->result();
1603             my $pid = $_->pid();
1604              
1605             print "[$pid] $err : $res\n";
1606             }
1607              
1608             =item MCE::Child->yield( [ floating_seconds ] )
1609              
1610             Give other workers a chance to run, optionally for given time. Yield behaves
1611             similarly to MCE's interval option. It throttles workers from running too fast.
1612             A demonstration is provided in the next section for fetching URLs in parallel.
1613              
1614             The default C<floating_seconds> is 0.008 and 0.015 on UNIX and Windows,
1615             respectively. Pass 0 if simply wanting to give other workers a chance to run.
1616              
1617             # total run time: 1.00 second
1618              
1619             MCE::Child->create( sub { MCE::Child->yield(0.25) } ) for 1 .. 4;
1620             MCE::Child->wait_all();
1621              
1622             =back
1623              
1624             =head1 THREADS-like DETACH CAPABILITY
1625              
1626             Threads-like detach capability was added starting with the 1.867 release.
1627              
1628             A threads example is shown first followed by the MCE::Child example. All one
1629             needs to do is set the CHLD signal handler to IGNORE. Unfortunately, this works
1630             on UNIX platforms only. The child process restores the CHLD handler to default,
1631             so is able to deeply spin workers and reap if desired.
1632              
1633             use threads;
1634              
1635             for ( 1 .. 8 ) {
1636             async {
1637             # do something
1638             }->detach;
1639             }
1640              
1641             use MCE::Child;
1642              
1643             # Have the OS reap workers automatically when exiting.
1644             # The on_finish option is ignored if specified (no-op).
1645             # Ensure not inside a thread on UNIX platforms.
1646              
1647             $SIG{CHLD} = 'IGNORE';
1648              
1649             for ( 1 .. 8 ) {
1650             mce_child {
1651             # do something
1652             };
1653             }
1654              
1655             # Optionally, wait for any remaining workers before leaving.
1656             # This is necessary if workers are consuming shared objects,
1657             # constructed via MCE::Shared.
1658              
1659             MCE::Child->wait_all;
1660              
1661             The following is another way and works on Windows.
1662             Here, the on_finish handler works as usual.
1663              
1664             use MCE::Child;
1665              
1666             MCE::Child->init(
1667             on_finish = sub {
1668             ...
1669             },
1670             );
1671              
1672             for ( 1 .. 8 ) {
1673             $_->join for MCE::Child->list_joinable;
1674             mce_child {
1675             # do something
1676             };
1677             }
1678              
1679             MCE::Child->wait_all;
1680              
1681             =head1 PARALLEL::FORKMANAGER-like DEMONSTRATION
1682              
1683             MCE::Child behaves similarly to threads for the most part. It also provides
1684             L<Parallel::ForkManager>-like capabilities. The C<Parallel::ForkManager>
1685             example is shown first followed by a version using C<MCE::Child>.
1686              
1687             =over 3
1688              
1689             =item Parallel::ForkManager
1690              
1691             use strict;
1692             use warnings;
1693              
1694             use Parallel::ForkManager;
1695             use Time::HiRes 'time';
1696              
1697             my $start = time;
1698              
1699             my $pm = Parallel::ForkManager->new(10);
1700             $pm->set_waitpid_blocking_sleep(0);
1701              
1702             $pm->run_on_finish( sub {
1703             my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
1704             print "child $pid completed: $ident => ", $resp->[0], "\n";
1705             });
1706              
1707             DATA_LOOP:
1708             foreach my $data ( 1..2000 ) {
1709             # forks and returns the pid for the child
1710             my $pid = $pm->start($data) and next DATA_LOOP;
1711             my $ret = [ $data * 2 ];
1712              
1713             $pm->finish(0, $ret);
1714             }
1715              
1716             $pm->wait_all_children;
1717              
1718             printf STDERR "duration: %0.03f seconds\n", time - $start;
1719              
1720             =item MCE::Child
1721              
1722             use strict;
1723             use warnings;
1724              
1725             use MCE::Child 1.843;
1726             use Time::HiRes 'time';
1727              
1728             my $start = time;
1729              
1730             MCE::Child->init(
1731             max_workers => 10,
1732             on_finish => sub {
1733             my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
1734             print "child $pid completed: $ident => ", $resp->[0], "\n";
1735             }
1736             );
1737              
1738             foreach my $data ( 1..2000 ) {
1739             MCE::Child->create( $data, sub {
1740             [ $data * 2 ];
1741             });
1742             }
1743              
1744             MCE::Child->wait_all;
1745              
1746             printf STDERR "duration: %0.03f seconds\n", time - $start;
1747              
1748             =item Time to spin 2,000 workers and obtain results (in seconds).
1749              
1750             Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo Boost).
1751             Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again with Moo loaded
1752             at the top of the script.
1753              
1754             MCE::Hobo uses MCE::Shared to retrieve data during reaping.
1755             MCE::Child uses MCE::Channel, no shared-manager.
1756              
1757             Version Cygwin Windows Linux macOS FreeBSD
1758              
1759             MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
1760             MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
1761             P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
1762              
1763             MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
1764             MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
1765             P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
1766              
1767             =item Set posix_exit to avoid all END and destructor processing.
1768              
1769             This is helpful for reducing overhead when workers exit. Ditto if using a Perl
1770             module not parallel safe. The option is ignored on Windows C<$^O eq 'MSWin32'>.
1771              
1772             MCE::Child->init( posix_exit => 1, ... );
1773             MCE::Hobo->init( posix_exit => 1, ... );
1774              
1775             Version Cygwin Windows Linux macOS FreeBSD
1776              
1777             MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
1778             MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
1779              
1780             =back
1781              
1782             =head1 PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT
1783              
1784             This demonstration constructs two queues, two handles, starts the
1785             shared-manager process if needed, and spawns four workers.
1786             For this demonstration, am chunking 64 URLs per job. In reality,
1787             one may run with 200 workers and chunk 300 URLs on a 24-way box.
1788              
1789             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1790             # perl demo.pl -- all output
1791             # perl demo.pl >/dev/null -- mngr/child output
1792             # perl demo.pl 2>/dev/null -- show results only
1793             #
1794             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1795              
1796             use strict;
1797             use warnings;
1798              
1799             use AnyEvent;
1800             use AnyEvent::HTTP;
1801             use Time::HiRes qw( time );
1802              
1803             use MCE::Child;
1804             use MCE::Shared;
1805              
1806             # Construct two queues, input and return.
1807              
1808             my $que = MCE::Shared->queue();
1809             my $ret = MCE::Shared->queue();
1810              
1811             # Construct shared handles for serializing output from many workers
1812             # writing simultaneously. This prevents garbled output.
1813              
1814             mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
1815             mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
1816              
1817             # Spawn workers early for minimum memory consumption.
1818              
1819             MCE::Child->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
1820              
1821             # Obtain or generate input data for workers to process.
1822              
1823             my ( $count, @urls ) = ( 0 );
1824              
1825             push @urls, map { "http://127.0.0.$_/" } 1..254;
1826             push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
1827              
1828             while ( @urls ) {
1829             my @chunk = splice(@urls, 0, 64);
1830             $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
1831             }
1832              
1833             # So that workers leave the loop after consuming the queue.
1834              
1835             $que->end();
1836              
1837             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1838             # Loop for the manager process. The manager may do other work if
1839             # need be and periodically check $ret->pending() not shown here.
1840             #
1841             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1842              
1843             my $start = time;
1844              
1845             printf {$ERR} "Mngr - entering loop\n";
1846              
1847             while ( $count ) {
1848             my ( $result, $failed ) = $ret->dequeue( 2 );
1849              
1850             # Remove ID from result, so not treated as a URL item.
1851              
1852             printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
1853              
1854             # Display the URL and the size captured.
1855              
1856             foreach my $url ( keys %{ $result } ) {
1857             printf {$OUT} "%s: %d\n", $url, length($result->{$url})
1858             if $result->{$url}; # url has content
1859             }
1860              
1861             # Display URLs could not reach.
1862              
1863             if ( @{ $failed } ) {
1864             foreach my $url ( @{ $failed } ) {
1865             print {$OUT} "Failed: $url\n";
1866             }
1867             }
1868              
1869             # Decrement the count.
1870              
1871             $count--;
1872             }
1873              
1874             MCE::Child->wait_all();
1875              
1876             printf {$ERR} "Mngr - exiting loop\n\n";
1877             printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
1878              
1879             exit;
1880              
1881             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1882             # Child processes enqueue two items ( $result and $failed ) per each
1883             # job for the manager process. Likewise, the manager process dequeues
1884             # two items above. Optionally, child processes may include the ID in
1885             # the result.
1886             #
1887             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1888              
1889             sub task {
1890             my ( $id ) = @_;
1891             printf {$ERR} "Child $id entering loop\n";
1892              
1893             while ( my $job = $que->dequeue() ) {
1894             my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
1895              
1896             # Walk URLs, provide a hash and array refs for data.
1897              
1898             printf {$ERR} "Child $id running job $job->{ID}\n";
1899             walk( $job, $result, $failed );
1900              
1901             # Send results to the manager process.
1902              
1903             $ret->enqueue( $result, $failed );
1904             }
1905              
1906             printf {$ERR} "Child $id exiting loop\n";
1907             }
1908              
1909             sub walk {
1910             my ( $job, $result, $failed ) = @_;
1911              
1912             # Yielding is critical when running an event loop in parallel.
1913             # Not doing so means that the app may reach contention points
1914             # with the firewall and likely impose unnecessary hardship at
1915             # the OS level. The idea here is not to have multiple workers
1916             # initiate HTTP requests to a batch of URLs at the same time.
1917             # Yielding behaves similarly like scatter to have the child
1918             # process run solo for a fraction of time.
1919              
1920             MCE::Child->yield( 0.03 );
1921              
1922             my $cv = AnyEvent->condvar();
1923              
1924             # Populate the hash ref for the URLs it could reach.
1925             # Do not mix AnyEvent timeout with child timeout.
1926             # Therefore, choose event timeout when available.
1927              
1928             foreach my $url ( @{ $job->{INPUT} } ) {
1929             $cv->begin();
1930             http_get $url, timeout => 2, sub {
1931             my ( $data, $headers ) = @_;
1932             $result->{$url} = $data;
1933             $cv->end();
1934             };
1935             }
1936              
1937             $cv->recv();
1938              
1939             # Populate the array ref for URLs it could not reach.
1940              
1941             foreach my $url ( @{ $job->{INPUT} } ) {
1942             push @{ $failed }, $url unless (exists $result->{ $url });
1943             }
1944              
1945             return;
1946             }
1947              
1948             __END__
1949              
1950             $ perl demo.pl
1951              
1952             Child 1 entering loop
1953             Child 2 entering loop
1954             Child 3 entering loop
1955             Mngr - entering loop
1956             Child 2 running job 2
1957             Child 3 running job 3
1958             Child 1 running job 1
1959             Child 4 entering loop
1960             Child 4 running job 4
1961             Child 2 running job 5
1962             Mngr - received job 2
1963             Child 3 running job 6
1964             Mngr - received job 3
1965             Child 1 running job 7
1966             Mngr - received job 1
1967             Child 4 running job 8
1968             Mngr - received job 4
1969             http://192.168.0.1/: 3729
1970             Child 2 exiting loop
1971             Mngr - received job 5
1972             Child 3 exiting loop
1973             Mngr - received job 6
1974             Child 1 exiting loop
1975             Mngr - received job 7
1976             Child 4 exiting loop
1977             Mngr - received job 8
1978             Mngr - exiting loop
1979              
1980             Duration: 4.131 seconds
1981              
1982             =head1 CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE
1983              
1984             Making an executable is possible with the L<PAR::Packer> module.
1985             On the Windows platform, threads, threads::shared, and exiting via
1986             threads are necessary for the binary to exit successfully.
1987              
1988             # https://metacpan.org/pod/PAR::Packer
1989             # https://metacpan.org/pod/pp
1990             #
1991             # pp -o demo.exe demo.pl
1992             # ./demo.exe
1993              
1994             use strict;
1995             use warnings;
1996              
1997             use if $^O eq "MSWin32", "threads";
1998             use if $^O eq "MSWin32", "threads::shared";
1999              
2000             # Include minimum dependencies for MCE::Child.
2001             # Add other modules required by your application here.
2002              
2003             use Storable ();
2004             use Time::HiRes ();
2005              
2006             # use IO::FDPass (); # optional: for condvar, handle, queue
2007             # use Sereal (); # optional: for faster serialization
2008              
2009             use MCE::Child;
2010             use MCE::Shared;
2011              
2012             # For PAR to work on the Windows platform, one must include manually
2013             # any shared modules used by the application.
2014              
2015             # use MCE::Shared::Array; # if using MCE::Shared->array
2016             # use MCE::Shared::Cache; # if using MCE::Shared->cache
2017             # use MCE::Shared::Condvar; # if using MCE::Shared->condvar
2018             # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
2019             # use MCE::Shared::Hash; # if using MCE::Shared->hash
2020             # use MCE::Shared::Minidb; # if using MCE::Shared->minidb
2021             # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
2022             # use MCE::Shared::Queue; # if using MCE::Shared->queue
2023             # use MCE::Shared::Scalar; # if using MCE::Shared->scalar
2024              
2025             # Et cetera. Only load modules needed for your application.
2026              
2027             use MCE::Shared::Sequence; # if using MCE::Shared->sequence
2028              
2029             my $seq = MCE::Shared->sequence( 1, 9 );
2030              
2031             sub task {
2032             my ( $id ) = @_;
2033             while ( defined ( my $num = $seq->next() ) ) {
2034             print "$id: $num\n";
2035             sleep 1;
2036             }
2037             }
2038              
2039             sub main {
2040             MCE::Child->new( \&task, $_ ) for 1 .. 3;
2041             MCE::Child->wait_all();
2042             }
2043              
2044             # Main must run inside a thread on the Windows platform or workers
2045             # will fail duing exiting, causing the exe to crash. The reason is
2046             # that PAR or a dependency isn't multi-process safe.
2047              
2048             ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
2049              
2050             threads->exit(0) if $INC{"threads.pm"};
2051              
2052             =head1 LIMITATION
2053              
2054             MCE::Child emits an error when C<is_joinable>, C<is_running>, and C<join> isn't
2055             called by the managed process, where the child was spawned. This is a limitation
2056             in MCE::Child only due to not involving a shared-manager process for IPC.
2057              
2058             This use-case is not typical.
2059              
2060             =head1 CREDITS
2061              
2062             The inspiration for C<MCE::Child> comes from wanting C<threads>-like behavior
2063             for processes compatible with Perl 5.8. Both can run side-by-side including
2064             safe-use by MCE workers. Likewise, the documentation resembles C<threads>.
2065              
2066             The inspiration for C<wait_all> and C<wait_one> comes from the
2067             C<Parallel::WorkUnit> module.
2068              
2069             =head1 SEE ALSO
2070              
2071             =over 3
2072              
2073             =item * L<forks>
2074              
2075             =item * L<forks::BerkeleyDB>
2076              
2077             =item * L<MCE::Hobo>
2078              
2079             =item * L<Parallel::ForkManager>
2080              
2081             =item * L<Parallel::Loops>
2082              
2083             =item * L<Parallel::Prefork>
2084              
2085             =item * L<Parallel::WorkUnit>
2086              
2087             =item * L<Proc::Fork>
2088              
2089             =item * L<Thread::Tie>
2090              
2091             =item * L<threads>
2092              
2093             =back
2094              
2095             =head1 INDEX
2096              
2097             L<MCE|MCE>, L<MCE::Channel>, L<MCE::Shared>
2098              
2099             =head1 AUTHOR
2100              
2101             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
2102              
2103             =cut
2104