File Coverage

blib/lib/MCE/Hobo.pm
Criterion Covered Total %
statement 341 510 66.8
branch 162 412 39.3
condition 51 161 31.6
subroutine 56 75 74.6
pod 23 23 100.0
total 633 1181 53.6


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## A threads-like parallelization module.
4             ##
5             ###############################################################################
6              
7 15     15   1155134 use strict;
  15         70  
  15         453  
8 15     15   75 use warnings;
  15         29  
  15         357  
9              
10 15     15   341 use 5.010001;
  15         46  
11              
12 15     15   85 no warnings qw( threads recursion uninitialized once redefine );
  15         19  
  15         1328  
13              
14             package MCE::Hobo;
15              
16             our $VERSION = '1.886';
17              
18             ## no critic (BuiltinFunctions::ProhibitStringyEval)
19             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
20             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
21             ## no critic (TestingAndDebugging::ProhibitNoStrict)
22              
23 15     15   8334 use MCE::Signal ();
  15         66050  
  15         402  
24 15     15   7082 use MCE::Mutex ();
  15         7284  
  15         320  
25 15     15   7635 use MCE::Channel ();
  15         297369  
  15         390  
26 15     15   95 use Time::HiRes 'sleep';
  15         27  
  15         90  
27              
28             use overload (
29             q(==) => \&equal,
30 0     0   0 q(!=) => sub { !equal(@_) },
31 15         403 fallback => 1
32 15     15   21355 );
  15         15480  
33              
34             sub import {
35 15 50   15   193 if (caller !~ /^MCE::/) {
36 15     15   1815 no strict 'refs'; no warnings 'redefine';
  15     15   31  
  15         462  
  15         73  
  15         30  
  15         1560  
37 15         30 *{ caller().'::mce_async' } = \&mce_async;
  15         87  
38             }
39 15         183 return;
40             }
41              
42             ## The POSIX module has many symbols. Try not loading it simply
43             ## to have WNOHANG. The following covers most platforms.
44              
45             use constant {
46 15 50       80547 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
47             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
48 15     15   109 };
  15         30  
49              
50             my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} );
51              
52             my $_freeze = MCE::Channel::_get_freeze();
53             my $_thaw = MCE::Channel::_get_thaw();
54              
55             my $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
56             my $_tid = ( $INC{'threads.pm'} ) ? threads->tid() : 0;
57              
58             sub CLONE {
59 0 0   0   0 $_tid = threads->tid(), &_clear() if $INC{'threads.pm'};
60             }
61              
62             sub _clear {
63 0     0   0 %{ $_LIST } = ();
  0         0  
64             }
65              
66             sub _max_workers {
67 10     10   32 my ( $cpus ) = @_;
68 10 100       80 if ( $cpus eq 'auto' ) {
    100          
69 1         28 $cpus = MCE::Util::get_ncpu();
70             }
71             elsif ( $cpus =~ /^([0-9.]+)%$/ ) {
72 6         47 my ( $percent, $ncpu ) = ( $1 / 100, MCE::Util::get_ncpu() );
73 6         41 $cpus = $ncpu * $percent + 0.5;
74             }
75 10 100 66     176 $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
76 10         36 return int($cpus);
77             }
78              
79             ###############################################################################
80             ## ----------------------------------------------------------------------------
81             ## Init routine.
82             ##
83             ###############################################################################
84              
85             bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__;
86              
87             sub init {
88 22 100 66 22 1 7513 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
89              
90             # -- options ----------------------------------------------------------
91             # max_workers hobo_timeout posix_exit on_start on_finish void_context
92             # ---------------------------------------------------------------------
93              
94 22 100       341 my $pkg = "$$.$_tid.".( caller eq __PACKAGE__ ? caller(1) : caller );
95 22 50       263 my $mngd = $_MNGD->{$pkg} = ( ref $_[0] eq 'HASH' ) ? shift : { @_ };
96              
97 22         79 @_ = ();
98              
99             $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg,
100 22         328 $mngd->{WRK_ID} = $$;
101              
102 22 100       235 &_force_reap($pkg), $_DATA->{$pkg}->clear() if ( exists $_LIST->{$pkg} );
103              
104 22 100       130 if ( !exists $_LIST->{$pkg} ) {
105 15 0 33     45 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
106 15 50       34 sleep 0.015 if $_tid;
107              
108             # Start the shared-manager process if not running.
109 15 50       314 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
110              
111 15         1067 my $chnl = MCE::Channel->new( impl => 'Mutex' );
112 15         97190 $_LIST->{ $pkg } = MCE::Hobo::_ordhash->new();
113 15         652 $_DELY->{ $pkg } = MCE::Hobo::_delay->new( $chnl );
114 15         377 $_DATA->{ $pkg } = MCE::Hobo::_hash->new();
115 15         747 $_DATA->{"$pkg:seed"} = int(rand() * 1e9);
116 15         66 $_DATA->{"$pkg:id" } = 0;
117              
118 15 0 33     92 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
119             }
120              
121 22 50       263 if ( !exists $mngd->{posix_exit} ) {
122             $mngd->{posix_exit} = 1 if (
123             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
124             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
125             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
126             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
127 22 50 33     1774 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
128             );
129             }
130              
131 22 100       149 if ( defined $mngd->{max_workers} ) {
132 3         46 $mngd->{max_workers} = _max_workers($mngd->{max_workers});
133             }
134              
135 22 50 33     104 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
136 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
137             }
138              
139             require POSIX
140 22 50 66     9266 if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 );
      66        
141              
142 22         56402 return;
143             }
144              
145             ###############################################################################
146             ## ----------------------------------------------------------------------------
147             ## 'new', 'mce_async', and 'create' for threads-like similarity.
148             ##
149             ###############################################################################
150              
151             ## 'new' and 'tid' are aliases for 'create' and 'pid' respectively.
152              
153             *new = \&create, *tid = \&pid;
154              
155             ## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2)
156             ## Tip found in threads::async.
157              
158             sub mce_async (&;@) {
159 0     0 1 0 goto &create;
160             }
161              
162             sub create {
163 70   66 70 1 149895 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
164             # construct mngd internally on first use unless defined
165             init(); $_MNGD->{ "$$.$_tid.".caller() };
166             };
167              
168 70 50       917 shift if ( $_[0] eq __PACKAGE__ );
169              
170             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
171              
172 70 50       1245 my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__;
  0         0  
173              
174 70 50       517 $self->{IGNORE} = 1 if $SIG{CHLD} eq 'IGNORE';
175 70         1371 $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG};
176 70 50 33     356 $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' );
177              
178 70 0 33     158 my $func = shift; $func = caller().'::'.$func
  70   33     249  
179             if ( !ref $func && length $func && index($func,':') < 0 );
180              
181 70 50       217 if ( !defined $func ) {
182 0         0 local $\; print {*STDERR} "code function is not specified or valid\n";
  0         0  
  0         0  
183 0         0 return undef;
184             }
185              
186             my ( $list, $max_workers, $pkg ) = (
187             $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG}
188 70         350 );
189              
190 70 50       894 $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) >= 2e9 );
191              
192 70 50 33     1252 if ( $max_workers || $self->{IGNORE} ) {
193 0         0 my $wrk_id; local $!;
  0         0  
194              
195             # Reap completed hobo processes.
196 0         0 for my $hobo ( $list->vals() ) {
197 0         0 $wrk_id = $hobo->{WRK_ID};
198 0 0       0 $list->del($wrk_id), next if $hobo->{REAPED};
199 0 0       0 waitpid($wrk_id, _WNOHANG) or next;
200 0         0 _reap_hobo($list->del($wrk_id), 0);
201             }
202              
203             # Wait for a slot if saturated.
204 0 0 0     0 if ( $max_workers && $list->len() >= $max_workers ) {
205 0         0 my $count = $list->len() - $max_workers + 1;
206 0         0 _wait_one($pkg) for 1 .. $count;
207             }
208             }
209              
210             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
211              
212 70 0 33     287 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
213              
214 70         333 my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages
  70         218  
215 70         251 my ( $killed, $pid );
216              
217             {
218 70     0   172 local $SIG{TERM} = local $SIG{INT} = sub { $killed = $_[0] }
  0         0  
219 70 50 33     4433 if ( !$_is_MSWin32 && $] ge '5.010001' );
220              
221             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
222 70 50       2976 if ( !$_is_MSWin32 );
223              
224 70         94195 $pid = fork();
225              
226 70 50       5251 if ( !defined $pid ) { # error
    100          
227 0         0 local $\; print {*STDERR} "fork error: $!\n";
  0         0  
  0         0  
228             }
229             elsif ( $pid ) { # parent
230 58         2861 $self->{WRK_ID} = $pid;
231 58         3108 $list->set($pid, $self);
232 58 100       10549 $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start};
233             }
234             else { # child
235 12         695 %{ $_LIST } = (), $_SELF = $self;
  12         1842  
236              
237             local $SIG{TERM} = local $SIG{INT} = local $SIG{ABRT} = \&_trap,
238             local $SIG{SEGV} = local $SIG{HUP} = \&_trap,
239 12         2336 local $SIG{QUIT} = \&_quit;
240 12         341 local $SIG{CHLD};
241              
242 12 50       1718 MCE::Shared::init() if $INC{'MCE/Shared.pm'};
243 12 50       714 $_DATA->{ $_SELF->{PKG} }->set('S'.$$, '') unless $self->{IGNORE};
244 12 50       75 CORE::kill($killed, $$) if $killed;
245              
246             # Sets the seed of the base generator uniquely between workers.
247             # The new seed is computed using the current seed and ID value.
248             # One may set the seed at the application level for predictable
249             # results. Ditto for Math::Prime::Util, Math::Random, and
250             # Math::Random::MT::Auto.
251              
252 12         156 srand( abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560 );
253              
254 12 50       156 if ( $INC{'Math/Prime/Util.pm'} ) {
255             Math::Prime::Util::srand(
256 0         0 abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560
257             );
258             }
259              
260 12 50       104 if ( $INC{'Math/Random.pm'} ) {
261 0         0 my $cur_seed = Math::Random::random_get_seed();
262 0 0       0 my $new_seed = ($cur_seed < 1073741781)
263             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
264             : $cur_seed - ((abs($id) * 10000) % 1073741780);
265              
266 0         0 Math::Random::random_set_seed($new_seed, $new_seed);
267             }
268              
269 12 50       144 if ( $INC{'Math/Random/MT/Auto.pm'} ) {
270 0         0 my $cur_seed = Math::Random::MT::Auto::get_seed()->[0];
271 0 0       0 my $new_seed = ($cur_seed < 1073741781)
272             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
273             : $cur_seed - ((abs($id) * 10000) % 1073741780);
274              
275 0         0 Math::Random::MT::Auto::set_seed($new_seed);
276             }
277              
278 12         492 _dispatch($mngd, $func, \@args);
279             }
280             }
281              
282 58 0 33     3075 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
283              
284 58 50       261 CORE::kill($killed, $$) if $killed;
285              
286 58 50       7733 return $pid ? $self : undef;
287             }
288              
289             ###############################################################################
290             ## ----------------------------------------------------------------------------
291             ## Public methods.
292             ##
293             ###############################################################################
294              
295             sub equal {
296 0 0 0 0 1 0 return 0 unless ( ref $_[0] && ref $_[1] );
297 0 0       0 $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0;
298             }
299              
300             sub error {
301 31 50   31 1 434 _croak('Usage: $hobo->error()') unless ref( my $self = $_[0] );
302 31 50       119 $self->join() unless $self->{REAPED};
303 31 50       417 $self->{ERROR} || undef;
304             }
305              
306             sub exit {
307 10 50 33 10 1 530 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
308              
309 10 50       245 my ( $self ) = ( ref $_[0] ? shift : $_SELF );
310 10         135 my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} );
311              
312 10 50 33     230 if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) {
    50          
313 0         0 MCE::Hobo->finish('MCE'); CORE::exit(@_);
  0         0  
314             }
315             elsif ( $wrk_id == $$ ) {
316 0   0     0 alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0;
  0         0  
  0         0  
317 0 0       0 $_DATA->{$pkg}->set('R'.$wrk_id, @res ? $_freeze->(\@res) : '');
318 0         0 die "Hobo exited ($?)\n";
319 0         0 _exit($?); # not reached
320             }
321              
322 10 50       135 return $self if $self->{REAPED};
323              
324 10 50       135 if ( exists $_DATA->{$pkg} ) {
325 10         375 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
326             } else {
327 0         0 sleep 0.030;
328             }
329              
330 10 50       35 if ($_is_MSWin32) {
331 0 0       0 CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id);
332             } else {
333 10 50       525 CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
334             }
335              
336 10         65 $self;
337             }
338              
339             sub finish {
340 7 50   7 1 3728 _croak('Usage: MCE::Hobo->finish()') if ref($_[0]);
341 7 50 33     80 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
342              
343 7 100       36 my $pkg = defined($_[0]) ? $_[0] : caller();
344              
345 7 100       98 if ( $pkg eq 'MCE' ) {
    100          
346 3         7 for my $key ( keys %{ $_LIST } ) { MCE::Hobo->finish($key); }
  3         38  
  3         68  
347             }
348             elsif ( exists $_LIST->{$pkg} ) {
349 3 50       39 return if $MCE::Signal::KILLED;
350              
351 3 50       21 if ( exists $_DELY->{$pkg} ) {
352 3         36 &_force_reap($pkg);
353             delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}),
354             delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}),
355 3         161 delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg });
356             }
357             }
358              
359 7         30 @_ = ();
360              
361 7         32 return;
362             }
363              
364             sub is_joinable {
365 24 50   24 1 88 _croak('Usage: $hobo->is_joinable()') unless ref( my $self = $_[0] );
366 24         80 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
367              
368 24 50       128 if ( $wrk_id == $$ ) {
    50          
369 0         0 '';
370             }
371             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
372 24 50       80 return '' if $self->{REAPED};
373 24         72 local $!;
374 24 50       440 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do {
375 0 0       0 _reap_hobo($self, 0) unless $self->{REAPED};
376 0         0 1;
377             };
378             }
379             else {
380             _croak('Error: $hobo->is_joinable() not called by managed process')
381 0 0       0 if ( $self->{IGNORE} );
382              
383 0 0       0 return '' if $self->{REAPED};
384 0 0       0 $_DATA->{$pkg}->exists('R'.$wrk_id) ? 1 : '';
385             }
386             }
387              
388             sub is_running {
389 24 50   24 1 8792 _croak('Usage: $hobo->is_running()') unless ref( my $self = $_[0] );
390 24         72 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
391              
392 24 50       136 if ( $wrk_id == $$ ) {
    50          
393 0         0 1;
394             }
395             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
396 24 50       64 return '' if $self->{REAPED};
397 24         80 local $!;
398 24 50       352 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do {
399 0 0       0 _reap_hobo($self, 0) unless $self->{REAPED};
400 0         0 '';
401             };
402             }
403             else {
404             _croak('Error: $hobo->is_running() not called by managed process')
405 0 0       0 if ( $self->{IGNORE} );
406              
407 0 0       0 return '' if $self->{REAPED};
408 0 0       0 $_DATA->{$pkg}->exists('R'.$wrk_id) ? '' : 1;
409             }
410             }
411              
412             sub join {
413 45 50   45 1 32495 _croak('Usage: $hobo->join()') unless ref( my $self = $_[0] );
414 45         199 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
415              
416 45 50       159 if ( $self->{REAPED} ) {
417 0 0       0 _croak('Hobo already joined') unless exists( $self->{RESULT} );
418 0 0       0 $_LIST->{$pkg}->del($wrk_id) if ( exists $_LIST->{$pkg} );
419              
420             return ( defined wantarray )
421 0 0       0 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 0       0  
422             : ();
423             }
424              
425 45 50       386 if ( $wrk_id == $$ ) {
    50          
426 0         0 _croak('Cannot join self');
427             }
428             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
429             # remove from list after reaping
430 45 50       412 if ( $_tid ) {
431 0         0 local $SIG{CHLD};
432 0         0 _reap_hobo($self, 1);
433 0         0 $_LIST->{$pkg}->del($wrk_id);
434             }
435             else {
436 45         1052 local ($SIG{CHLD}, $!);
437 45         31024541 waitpid($wrk_id, 0);
438 45         793 _reap_hobo($self, 0);
439 45         306 $_LIST->{$pkg}->del($wrk_id);
440             }
441             }
442             else {
443             _croak('Error: $hobo->join() not called by managed process')
444 0 0       0 if ( $self->{IGNORE} );
445              
446 0         0 sleep 0.3 until ( $_DATA->{$pkg}->exists('R'.$wrk_id) );
447 0         0 _reap_hobo($self, 0);
448             }
449              
450 45 50       259 return unless ( exists $self->{RESULT} );
451              
452             ( defined wantarray )
453 45 50       790 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 100       0  
454             : ();
455             }
456              
457             sub kill {
458 5 50   5 1 240 _croak('Usage: $hobo->kill()') unless ref( my $self = $_[0] );
459 5         65 my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] );
460              
461 5 50       35 if ( $wrk_id == $$ ) {
462 0   0     0 CORE::kill($signal || 'INT', $$);
463 0         0 return $self;
464             }
465 5 50       60 if ( $self->{MGR_ID} eq "$$.$_tid" ) {
466 5 50       25 return $self if $self->{REAPED};
467 5 50       20 if ( exists $_DATA->{$pkg} ) {
468 5         30 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
469             } else {
470 0         0 sleep 0.030;
471             }
472             }
473              
474 5 50 50     235 CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
475              
476 5         40 $self;
477             }
478              
479             sub list {
480 8 50   8 1 4664 _croak('Usage: MCE::Hobo->list()') if ref($_[0]);
481 8         56 my $pkg = "$$.$_tid.".caller();
482              
483 8 50       96 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : ();
484             }
485              
486             sub list_pids {
487 8 50   8 1 616 _croak('Usage: MCE::Hobo->list_pids()') if ref($_[0]);
488 8         512 my $pkg = "$$.$_tid.".caller(); local $_;
  8         208  
489              
490 8 50       816 ( exists $_LIST->{$pkg} ) ? map { $_->pid } $_LIST->{$pkg}->vals() : ();
  24         392  
491             }
492              
493             sub list_joinable {
494 8 50   8 1 5152 _croak('Usage: MCE::Hobo->list_joinable()') if ref($_[0]);
495 8         1000 my $pkg = "$$.$_tid.".caller();
496              
497 8 50       624 return () unless ( my $list = $_LIST->{$pkg} );
498 8         72 local ($!, $?, $_);
499              
500             map {
501 8 50       32 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do {
  24         232  
502 0 0       0 _reap_hobo($_, 0) unless $_->{REAPED};
503 0         0 $_;
504             };
505             }
506             $list->vals();
507             }
508              
509             sub list_running {
510 8 50   8 1 18600 _croak('Usage: MCE::Hobo->list_running()') if ref($_[0]);
511 8         64 my $pkg = "$$.$_tid.".caller();
512              
513 8 50       136 return () unless ( my $list = $_LIST->{$pkg} );
514 8         224 local ($!, $?, $_);
515              
516             map {
517 8 50       64 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do {
  24         392  
518 0 0       0 _reap_hobo($_, 0) unless $_->{REAPED};
519 0         0 ();
520             };
521             }
522             $list->vals();
523             }
524              
525             sub max_workers {
526 17 50   17 1 118 _croak('Usage: MCE::Hobo->max_workers()') if ref($_[0]);
527 17   33     121 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
528             # construct mngd internally on first use unless defined
529             init(); $_MNGD->{ "$$.$_tid.".caller() };
530             };
531 17 50       60 shift if ( $_[0] eq __PACKAGE__ );
532              
533 17 100       56 $mngd->{max_workers} = _max_workers(shift) if @_;
534 17         138 $mngd->{max_workers};
535             }
536              
537             sub pending {
538 8 50   8 1 4504 _croak('Usage: MCE::Hobo->pending()') if ref($_[0]);
539 8         152 my $pkg = "$$.$_tid.".caller();
540              
541 8 50       280 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0;
542             }
543              
544             sub pid {
545 31 50   31 1 520 ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID};
546             }
547              
548             sub result {
549 7 50   7 1 119 _croak('Usage: $hobo->result()') unless ref( my $self = $_[0] );
550 7 50       33 return $self->join() unless $self->{REAPED};
551              
552 7 50       47 _croak('Hobo already joined') unless exists( $self->{RESULT} );
553 7 50       95 wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1];
  0         0  
554             }
555              
556             sub self {
557 0 0   0 1 0 ref($_[0]) ? $_[0] : $_SELF;
558             }
559              
560             sub wait_all {
561 1 50   1 1 70 _croak('Usage: MCE::Hobo->wait_all()') if ref($_[0]);
562 1         50 my $pkg = "$$.$_tid.".caller();
563              
564             return wantarray ? () : 0
565 1 0 33     65 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
    50          
566              
567 1         20 local $_; ( wantarray )
568 0         0 ? map { $_->join(); $_ } $_LIST->{$pkg}->vals()
  0         0  
569 1 50       15 : map { $_->join(); () } $_LIST->{$pkg}->vals();
  3         40  
  3         15  
570             }
571              
572             *waitall = \&wait_all; # compatibility
573              
574             sub wait_one {
575 4 50   4 1 324 _croak('Usage: MCE::Hobo->wait_one()') if ref($_[0]);
576 4         192 my $pkg = "$$.$_tid.".caller();
577              
578             return undef
579 4 50 33     192 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
580              
581 4         132 _wait_one($pkg);
582             }
583              
584             *waitone = \&wait_one; # compatibility
585              
586             sub yield {
587 0 0   0 1 0 _croak('Usage: MCE::Hobo->yield()') if ref($_[0]);
588 0 0 0     0 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
589              
590 0   0     0 my $pkg = $_SELF->{PKG} || do {
591             my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
592             # construct mngd internally on first use unless defined
593             init(); $_MNGD->{ "$$.$_tid.".caller() };
594             };
595             $mngd->{PKG};
596             };
597              
598 0 0       0 return unless $_DELY->{$pkg};
599 0         0 my $seconds = $_DELY->{$pkg}->seconds(@_);
600              
601 0         0 MCE::Util::_sleep( $seconds );
602             }
603              
604             ###############################################################################
605             ## ----------------------------------------------------------------------------
606             ## Private methods.
607             ##
608             ###############################################################################
609              
610             sub _croak {
611 0 0   0   0 if ( $INC{'MCE.pm'} ) {
612 0         0 goto &MCE::_croak;
613             }
614             else {
615 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
616 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
617              
618 0         0 $\ = undef; goto &Carp::croak;
  0         0  
619             }
620             }
621              
622             sub _dispatch {
623 12     12   151 my ( $mngd, $func, $args ) = @_;
624              
625 12         367 $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$, $? = 0;
626 12 50       140 $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32;
627              
628             {
629 12         31 local $!;
  12         422  
630 12 50       672 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
631 12 50       3762 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
632             }
633              
634             # Run task.
635             my $hobo_timeout = ( exists $_SELF->{hobo_timeout} )
636 12 50       719 ? $_SELF->{hobo_timeout} : $mngd->{hobo_timeout};
637              
638             my $void_context = ( exists $_SELF->{void_context} )
639 12 50       261 ? $_SELF->{void_context} : $mngd->{void_context};
640              
641 12         47 my @res; my $timed_out = 0;
  12         48  
642              
643             local $SIG{'ALRM'} = sub {
644 0     0   0 alarm 0; $timed_out = 1; $SIG{__WARN__} = sub {};
  0         0  
  0         0  
645 0         0 die "Hobo timed out\n";
646 12         1002 };
647              
648 11 50 33     400 if ( $void_context || $_SELF->{IGNORE} ) {
649 15     15   135 no strict 'refs';
  15         20  
  15         994  
650 0   0     0 eval { alarm($hobo_timeout || 0); $func->(@{ $args }) };
  0         0  
  0         0  
  0         0  
651             }
652             else {
653 15     15   103 no strict 'refs';
  15         30  
  15         3451  
654 11   50     201 @res = eval { alarm($hobo_timeout || 0); $func->(@{ $args }) };
  11         1112  
  11         46  
  11         240  
655             }
656              
657 9         26893 alarm 0;
658 9 50       91 $@ = "Hobo timed out" if $timed_out;
659              
660 9 50       57 if ( $@ ) {
661 0 0       0 _exit($?) if ( $@ =~ /^Hobo exited \(\S+\)$/ );
662 0         0 my $err = $@; $? = 1; $err =~ s/, <__ANONIO__> line \d+//;
  0         0  
  0         0  
663              
664 0 0       0 if ( ! $_SELF->{IGNORE} ) {
665             $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err),
666 0         0 $_DATA->{ $_SELF->{PKG} }->set('R'.$$, '');
667             }
668              
669 0 0 0     0 if ( !$timed_out && !$mngd->{on_finish} && !$INC{'MCE/Simple.pm'} ) {
      0        
670 15     15   9675 use bytes; warn "Hobo $$ terminated abnormally: reason $err\n";
  15         239  
  15         75  
  0         0  
671             }
672             }
673             else {
674 9 50       92 shift(@res) if ref($res[0]) =~ /^MCE::(?:Barrier|Semaphore)::_guard/s;
675             $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? $_freeze->(\@res) : '')
676 9 50       426 if ( ! $_SELF->{IGNORE} );
    50          
677             }
678              
679 9         182 _exit($?);
680             }
681              
682             sub _exit {
683 12     12   57 my ( $exit_status ) = @_;
684              
685             # Check for nested workers not yet joined.
686 12 50 66     134 MCE::Hobo->finish('MCE') if ( !$_SELF->{SIGNALED} && keys %{ $_LIST } );
  9         120  
687              
688             # Exit hobo process.
689 12 50   0   530 $SIG{__DIE__} = sub {} unless $_tid;
690 12     0   277 $SIG{__WARN__} = sub {};
691              
692 12 50 33     137 threads->exit($exit_status) if ( $INC{'threads.pm'} && $_is_MSWin32 );
693              
694             my $posix_exit = ( exists $_SELF->{posix_exit} )
695 12 50       119 ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit};
696              
697 12 0 33     83 if ( $posix_exit && !$_SELF->{SIGNALED} && !$_is_MSWin32 ) {
      33        
698 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
699 0 0       0 POSIX::_exit($exit_status) if $INC{'POSIX.pm'};
700 0         0 CORE::kill('KILL', $$);
701             }
702              
703 12         5724 CORE::exit($exit_status);
704             }
705              
706             sub _force_reap {
707 10     10   84 my ( $count, $pkg ) = ( 0, @_ );
708 10 50 33     288 return unless ( exists $_LIST->{$pkg} && $_LIST->{$pkg}->len() );
709              
710 0         0 for my $hobo ( $_LIST->{$pkg}->vals() ) {
711 0 0       0 next if $hobo->{IGNORE};
712              
713 0 0       0 if ( $hobo->is_running() ) {
714 0 0       0 sleep(0.015), CORE::kill('KILL', $hobo->pid())
715             if CORE::kill('ZERO', $hobo->pid());
716 0         0 $count++;
717             }
718             }
719              
720 0         0 $_LIST->{$pkg}->clear();
721              
722 0 0 0     0 warn "Finished with active hobo processes [$pkg] ($count)\n"
723             if ( $count && !$_is_MSWin32 );
724              
725 0         0 return;
726             }
727              
728             sub _quit {
729 3 50   3   3035 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
730              
731 3         22 alarm 0; my ( $name ) = @_;
  3         18  
732 3         224 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
733              
734       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
735 3 50       111 if ( exists $SIG{$name} );
736              
737 3 50       24 if ( ! $_SELF->{IGNORE} ) {
738 3         29 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
739 3         20 $_DATA->{$pkg}->set('R'.$wrk_id, '');
740             }
741              
742 3         45 _exit(0);
743             }
744              
745             sub _reap_hobo {
746 49     49   226 my ( $hobo, $wait_flag ) = @_;
747 49 50       439 return unless $hobo;
748              
749 49         711 local @_ = $_DATA->{ $hobo->{PKG} }->get($hobo->{WRK_ID}, $wait_flag);
750              
751 49 100 50     2647 ( $hobo->{ERROR}, $hobo->{RESULT}, $hobo->{REAPED} ) =
752             ( pop || '', length $_[0] ? $_thaw->(pop) : [], 1 );
753              
754 49 50       284 return if $hobo->{IGNORE};
755              
756 49   50     748 my ( $exit, $err ) = ( $? || 0, $hobo->{ERROR} );
757 49         241 my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f );
758              
759 49 50 33     253 if ( $code > 100 && !$err ) {
760 0 0       0 $code = 2, $sig = 1, $err = 'Hobo received SIGHUP' if $code == 101;
761 0 0       0 $code = 2, $sig = 2, $err = 'Hobo received SIGINT' if $code == 102;
762 0 0       0 $code = 2, $sig = 6, $err = 'Hobo received SIGABRT' if $code == 106;
763 0 0       0 $code = 2, $sig = 11, $err = 'Hobo received SIGSEGV' if $code == 111;
764 0 0       0 $code = 2, $sig = 15, $err = 'Hobo received SIGTERM' if $code == 115;
765              
766 0         0 $hobo->{ERROR} = $err;
767             }
768              
769 49 100       286 if ( my $on_finish = $_MNGD->{ $hobo->{PKG} }{on_finish} ) {
770             $on_finish->(
771             $hobo->{WRK_ID}, $code, $hobo->{ident}, $sig, $err,
772 7         109 @{ $hobo->{RESULT} }
  7         147  
773             );
774             }
775              
776 49         389 return;
777             }
778              
779             sub _trap {
780 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
781              
782 0         0 alarm 0; my ( $exit_status, $name ) = ( 2, @_ );
  0         0  
783 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
784              
785       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
786 0 0       0 if ( exists $SIG{$name} );
787              
788 0 0       0 if ( $name eq 'HUP' ) { $exit_status = 101 }
  0 0       0  
    0          
    0          
    0          
789 0         0 elsif ( $name eq 'INT' ) { $exit_status = 102 }
790 0         0 elsif ( $name eq 'ABRT' ) { $exit_status = 106 }
791 0         0 elsif ( $name eq 'SEGV' ) { $exit_status = 111 }
792 0         0 elsif ( $name eq 'TERM' ) { $exit_status = 115 }
793              
794 0 0       0 if ( ! $_SELF->{IGNORE} ) {
795 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
796 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
797             }
798              
799 0         0 _exit($exit_status);
800             }
801              
802             sub _wait_one {
803 4     4   48 my ( $pkg ) = @_;
804 4         40 my ( $list, $self, $wrk_id ) = ( $_LIST->{$pkg} ); local $!;
  4         132  
805              
806 4         28 while () {
807 252         2808 for my $hobo ( $list->vals() ) {
808 252         740 $wrk_id = $hobo->{WRK_ID};
809 252 50       1088 return $list->del($wrk_id) if $hobo->{REAPED};
810 252 100       3464 $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG);
811             }
812 252 100       912 last if $self;
813 248         7473408 sleep 0.030;
814             }
815              
816 4         132 _reap_hobo($self, 0);
817              
818 4         52 $self;
819             }
820              
821             ###############################################################################
822             ## ----------------------------------------------------------------------------
823             ## Delay implementation suited for MCE::Hobo.
824             ##
825             ###############################################################################
826              
827             package # hide from rpm
828             MCE::Hobo::_delay;
829              
830             sub new {
831 15     15   179 my ( $class, $chnl, $delay ) = @_;
832              
833 15 50       152 if ( !defined $delay ) {
834 15 50       176 $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008;
835             }
836              
837 15         127 $chnl->send(undef);
838              
839 15         990 bless [ $delay, $chnl ], $class;
840             }
841              
842             sub seconds {
843 0     0   0 my ( $self, $how_long ) = @_;
844 0 0       0 my $delay = defined($how_long) ? $how_long : $self->[0];
845 0         0 my $lapse = $self->[1]->recv();
846 0         0 my $time = MCE::Util::_time();
847              
848 0 0 0     0 if ( !$delay || !defined $lapse ) {
    0          
849 0         0 $lapse = $time;
850             }
851             elsif ( $lapse + $delay - $time < 0 ) {
852 0         0 $lapse += int( abs($time - $lapse) / $delay + 0.5 ) * $delay;
853             }
854              
855 0         0 $self->[1]->send( $lapse += $delay );
856              
857 0         0 return $lapse - $time;
858             }
859              
860             ###############################################################################
861             ## ----------------------------------------------------------------------------
862             ## Hash and ordhash implementations suited for MCE::Hobo.
863             ##
864             ###############################################################################
865              
866             package # hide from rpm
867             MCE::Hobo::_hash;
868              
869 15     15   36375 use MCE::Shared ();
  15         42  
  15         466  
870 15     15   106 use Time::HiRes 'sleep';
  15         52  
  15         90  
871              
872             use constant {
873 15 50       12975 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
874             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
875 15     15   2003 };
  15         30  
876              
877             sub new {
878 15     15   523 bless \ MCE::Shared->share({ module => 'MCE::Shared::Hash' }), shift;
879             }
880              
881 7     7   96 sub clear { ${ $_[0] }->clear(); }
  7         175  
882 15     15   75 sub exists { ${ $_[0] }->exists($_[1]); }
  15         670  
883 24     24   129 sub set { ${ $_[0] }->set($_[1], $_[2]); }
  24         977  
884              
885             sub get {
886 49     49   217 my ( $self, $wrk_id, $wait_flag ) = @_;
887              
888 49 50       233 if ( $wait_flag ) {
889 0         0 local $!;
890 0 0       0 ( ${ $self }->exists('R'.$wrk_id) ) ? waitpid($wrk_id, 0) : do {
  0         0  
891 0         0 while () {
892 0 0       0 if ( ! ${ $self }->exists('R'.$wrk_id) ) {
  0         0  
893 0 0       0 last if waitpid($wrk_id, _WNOHANG);
894 0         0 sleep(0.030), next;
895             }
896 0         0 waitpid($wrk_id, 0), last;
897             }
898             };
899             }
900              
901 49         116 ${ $self }->_get_hobo_data($wrk_id);
  49         723  
902             }
903              
904             package # hide from rpm
905             MCE::Hobo::_ordhash;
906              
907 15     15   303 sub new { bless [ {}, [], {}, 0 ], shift; } # data, keys, indx, gcnt
908 0     0   0 sub exists { CORE::exists $_[0]->[0]{ $_[1] }; }
909 0     0   0 sub get { $_[0]->[0]{ $_[1] }; }
910 23     23   151 sub len { scalar keys %{ $_[0]->[0] }; }
  23         307  
911              
912             sub clear {
913 0     0   0 my ( $self ) = @_;
914 0         0 %{ $self->[0] } = @{ $self->[1] } = %{ $self->[2] } = (), $self->[3] = 0;
  0         0  
  0         0  
  0         0  
915              
916 0         0 return;
917             }
918              
919             sub del {
920 49     49   242 my ( $self, $key ) = @_;
921 49 50       517 return undef unless defined( my $off = delete $self->[2]{$key} );
922              
923             # tombstone
924 49         190 $self->[1][$off] = undef;
925              
926             # GC keys and refresh index
927 49 100       268 if ( ++$self->[3] > @{ $self->[1] } * 0.667 ) {
  49         457  
928 21         106 my ( $keys, $indx ) = ( $self->[1], $self->[2] );
929 21         103 my $i; $i = $self->[3] = 0;
  21         80  
930 21         74 for my $k ( @{ $keys } ) {
  21         321  
931 49 50       292 $keys->[$i] = $k, $indx->{$k} = $i++ if defined($k);
932             }
933 21         108 splice @{ $keys }, $i;
  21         171  
934             }
935              
936 49         1328 delete $self->[0]{$key};
937             }
938              
939             sub set {
940 58     58   1725 my ( $self, $key ) = @_;
941 58 50       1526 $self->[0]{$key} = $_[2], return 1 if exists($self->[0]{$key});
942              
943 58         485 $self->[2]{$key} = @{ $self->[1] }; push @{ $self->[1] }, $key;
  58         2770  
  58         280  
  58         1623  
944 58         1125 $self->[0]{$key} = $_[2];
945              
946 58         252 return 1;
947             }
948              
949             sub vals {
950 285     285   1063 my ( $self ) = @_;
951             $self->[3]
952 0         0 ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } }
  0         0  
953 285 50       1347 : @{ $self->[0] }{ @{ $self->[1] } };
  285         1839  
  285         842  
954             }
955              
956             1;
957              
958             __END__