File Coverage

blib/lib/MCE/Hobo.pm
Criterion Covered Total %
statement 341 510 66.8
branch 162 412 39.3
condition 51 161 31.6
subroutine 56 75 74.6
pod 23 23 100.0
total 633 1181 53.6


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## A threads-like parallelization module.
4             ##
5             ###############################################################################
6              
7 15     15   888939 use strict;
  15         55  
  15         360  
8 15     15   72 use warnings;
  15         15  
  15         321  
9              
10 15     15   222 use 5.010001;
  15         34  
11              
12 15     15   72 no warnings qw( threads recursion uninitialized once redefine );
  15         30  
  15         955  
13              
14             package MCE::Hobo;
15              
16             our $VERSION = '1.885';
17              
18             ## no critic (BuiltinFunctions::ProhibitStringyEval)
19             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
20             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
21             ## no critic (TestingAndDebugging::ProhibitNoStrict)
22              
23 15     15   6377 use MCE::Signal ();
  15         49110  
  15         330  
24 15     15   5622 use MCE::Mutex ();
  15         5171  
  15         254  
25 15     15   5879 use MCE::Channel ();
  15         230917  
  15         360  
26 15     15   94 use Time::HiRes 'sleep';
  15         30  
  15         74  
27              
28             use overload (
29             q(==) => \&equal,
30 0     0   0 q(!=) => sub { !equal(@_) },
31 15         143 fallback => 1
32 15     15   16185 );
  15         12213  
33              
34             sub import {
35 15 50   15   186 if (caller !~ /^MCE::/) {
36 15     15   1529 no strict 'refs'; no warnings 'redefine';
  15     15   41  
  15         484  
  15         65  
  15         29  
  15         1397  
37 15         30 *{ caller().'::mce_async' } = \&mce_async;
  15         80  
38             }
39 15         173 return;
40             }
41              
42             ## The POSIX module has many symbols. Try not loading it simply
43             ## to have WNOHANG. The following covers most platforms.
44              
45             use constant {
46 15 50       59376 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
47             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
48 15     15   79 };
  15         29  
49              
50             my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} );
51              
52             my $_freeze = MCE::Channel::_get_freeze();
53             my $_thaw = MCE::Channel::_get_thaw();
54              
55             my $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
56             my $_tid = ( $INC{'threads.pm'} ) ? threads->tid() : 0;
57              
58             sub CLONE {
59 0 0   0   0 $_tid = threads->tid(), &_clear() if $INC{'threads.pm'};
60             }
61              
62             sub _clear {
63 0     0   0 %{ $_LIST } = ();
  0         0  
64             }
65              
66             sub _max_workers {
67 10     10   18 my ( $cpus ) = @_;
68 10 100       49 if ( $cpus eq 'auto' ) {
    100          
69 1         16 $cpus = MCE::Util::get_ncpu();
70             }
71             elsif ( $cpus =~ /^([0-9.]+)%$/ ) {
72 6         32 my ( $percent, $ncpu ) = ( $1 / 100, MCE::Util::get_ncpu() );
73 6         24 $cpus = $ncpu * $percent + 0.5;
74             }
75 10 100 66     98 $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
76 10         24 return int($cpus);
77             }
78              
79             ###############################################################################
80             ## ----------------------------------------------------------------------------
81             ## Init routine.
82             ##
83             ###############################################################################
84              
85             bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__;
86              
87             sub init {
88 22 100 66 22 1 3117 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
89              
90             # -- options ----------------------------------------------------------
91             # max_workers hobo_timeout posix_exit on_start on_finish void_context
92             # ---------------------------------------------------------------------
93              
94 22 100       213 my $pkg = "$$.$_tid.".( caller eq __PACKAGE__ ? caller(1) : caller );
95 22 50       177 my $mngd = $_MNGD->{$pkg} = ( ref $_[0] eq 'HASH' ) ? shift : { @_ };
96              
97 22         56 @_ = ();
98              
99             $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg,
100 22         189 $mngd->{WRK_ID} = $$;
101              
102 22 100       140 &_force_reap($pkg), $_DATA->{$pkg}->clear() if ( exists $_LIST->{$pkg} );
103              
104 22 100       65 if ( !exists $_LIST->{$pkg} ) {
105 15 0 33     42 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
106 15 50       34 sleep 0.015 if $_tid;
107              
108             # Start the shared-manager process if not running.
109 15 50       291 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
110              
111 15         853 my $chnl = MCE::Channel->new( impl => 'Mutex' );
112 15         73423 $_LIST->{ $pkg } = MCE::Hobo::_ordhash->new();
113 15         374 $_DELY->{ $pkg } = MCE::Hobo::_delay->new( $chnl );
114 15         258 $_DATA->{ $pkg } = MCE::Hobo::_hash->new();
115 15         519 $_DATA->{"$pkg:seed"} = int(rand() * 1e9);
116 15         60 $_DATA->{"$pkg:id" } = 0;
117              
118 15 0 33     72 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
119             }
120              
121 22 50       84 if ( !exists $mngd->{posix_exit} ) {
122             $mngd->{posix_exit} = 1 if (
123             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
124             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
125             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
126             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
127 22 50 33     1219 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
128             );
129             }
130              
131 22 100       108 if ( defined $mngd->{max_workers} ) {
132 3         21 $mngd->{max_workers} = _max_workers($mngd->{max_workers});
133             }
134              
135 22 50 33     93 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
136 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
137             }
138              
139             require POSIX
140 22 50 66     4423 if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 );
      66        
141              
142 22         32683 return;
143             }
144              
145             ###############################################################################
146             ## ----------------------------------------------------------------------------
147             ## 'new', 'mce_async', and 'create' for threads-like similarity.
148             ##
149             ###############################################################################
150              
151             ## 'new' and 'tid' are aliases for 'create' and 'pid' respectively.
152              
153             *new = \&create, *tid = \&pid;
154              
155             ## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2)
156             ## Tip found in threads::async.
157              
158             sub mce_async (&;@) {
159 0     0 1 0 goto &create;
160             }
161              
162             sub create {
163 70   66 70 1 107689 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
164             # construct mngd internally on first use unless defined
165             init(); $_MNGD->{ "$$.$_tid.".caller() };
166             };
167              
168 70 50       642 shift if ( $_[0] eq __PACKAGE__ );
169              
170             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
171              
172 70 50       554 my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__;
  0         0  
173              
174 70 50       334 $self->{IGNORE} = 1 if $SIG{CHLD} eq 'IGNORE';
175 70         885 $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG};
176 70 50 33     351 $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' );
177              
178 70 0 33     137 my $func = shift; $func = caller().'::'.$func
  70   33     189  
179             if ( !ref $func && length $func && index($func,':') < 0 );
180              
181 70 50       892 if ( !defined $func ) {
182 0         0 local $\; print {*STDERR} "code function is not specified or valid\n";
  0         0  
  0         0  
183 0         0 return undef;
184             }
185              
186             my ( $list, $max_workers, $pkg ) = (
187             $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG}
188 70         275 );
189              
190 70 50       561 $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) >= 2e9 );
191              
192 70 50 33     1980 if ( $max_workers || $self->{IGNORE} ) {
193 0         0 my $wrk_id; local $!;
  0         0  
194              
195             # Reap completed hobo processes.
196 0         0 for my $hobo ( $list->vals() ) {
197 0         0 $wrk_id = $hobo->{WRK_ID};
198 0 0       0 $list->del($wrk_id), next if $hobo->{REAPED};
199 0 0       0 waitpid($wrk_id, _WNOHANG) or next;
200 0         0 _reap_hobo($list->del($wrk_id), 0);
201             }
202              
203             # Wait for a slot if saturated.
204 0 0 0     0 if ( $max_workers && $list->len() >= $max_workers ) {
205 0         0 my $count = $list->len() - $max_workers + 1;
206 0         0 _wait_one($pkg) for 1 .. $count;
207             }
208             }
209              
210             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
211              
212 70 0 33     193 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
213              
214 70         129 my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages
  70         106  
215 70         130 my ( $killed, $pid );
216              
217             {
218 70     0   105 local $SIG{TERM} = local $SIG{INT} = sub { $killed = $_[0] }
  0         0  
219 70 50 33     2901 if ( !$_is_MSWin32 && $] ge '5.010001' );
220              
221             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
222 70 50       2020 if ( !$_is_MSWin32 );
223              
224 70         83594 $pid = fork();
225              
226 70 50       3481 if ( !defined $pid ) { # error
    100          
227 0         0 local $\; print {*STDERR} "fork error: $!\n";
  0         0  
  0         0  
228             }
229             elsif ( $pid ) { # parent
230 58         2307 $self->{WRK_ID} = $pid;
231 58         2054 $list->set($pid, $self);
232 58 100       7397 $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start};
233             }
234             else { # child
235 12         423 %{ $_LIST } = (), $_SELF = $self;
  12         1453  
236              
237             local $SIG{TERM} = local $SIG{INT} = local $SIG{ABRT} = \&_trap,
238             local $SIG{SEGV} = local $SIG{HUP} = \&_trap,
239 12         1700 local $SIG{QUIT} = \&_quit;
240 12         263 local $SIG{CHLD};
241              
242 12 50       1103 MCE::Shared::init() if $INC{'MCE/Shared.pm'};
243 12 50       426 $_DATA->{ $_SELF->{PKG} }->set('S'.$$, '') unless $self->{IGNORE};
244 12 50       58 CORE::kill($killed, $$) if $killed;
245              
246             # Sets the seed of the base generator uniquely between workers.
247             # The new seed is computed using the current seed and ID value.
248             # One may set the seed at the application level for predictable
249             # results. Ditto for Math::Prime::Util, Math::Random, and
250             # Math::Random::MT::Auto.
251              
252 12         96 srand( abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560 );
253              
254 12 50       63 if ( $INC{'Math/Prime/Util.pm'} ) {
255             Math::Prime::Util::srand(
256 0         0 abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560
257             );
258             }
259              
260 12 50       72 if ( $INC{'Math/Random.pm'} ) {
261 0         0 my $cur_seed = Math::Random::random_get_seed();
262 0 0       0 my $new_seed = ($cur_seed < 1073741781)
263             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
264             : $cur_seed - ((abs($id) * 10000) % 1073741780);
265              
266 0         0 Math::Random::random_set_seed($new_seed, $new_seed);
267             }
268              
269 12 50       59 if ( $INC{'Math/Random/MT/Auto.pm'} ) {
270 0         0 my $cur_seed = Math::Random::MT::Auto::get_seed()->[0];
271 0 0       0 my $new_seed = ($cur_seed < 1073741781)
272             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
273             : $cur_seed - ((abs($id) * 10000) % 1073741780);
274              
275 0         0 Math::Random::MT::Auto::set_seed($new_seed);
276             }
277              
278 12         367 _dispatch($mngd, $func, \@args);
279             }
280             }
281              
282 58 0 33     1995 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
283              
284 58 50       239 CORE::kill($killed, $$) if $killed;
285              
286 58 50       8214 return $pid ? $self : undef;
287             }
288              
289             ###############################################################################
290             ## ----------------------------------------------------------------------------
291             ## Public methods.
292             ##
293             ###############################################################################
294              
295             sub equal {
296 0 0 0 0 1 0 return 0 unless ( ref $_[0] && ref $_[1] );
297 0 0       0 $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0;
298             }
299              
300             sub error {
301 31 50   31 1 239 _croak('Usage: $hobo->error()') unless ref( my $self = $_[0] );
302 31 50       133 $self->join() unless $self->{REAPED};
303 31 50       273 $self->{ERROR} || undef;
304             }
305              
306             sub exit {
307 10 50 33 10 1 365 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
308              
309 10 50       170 my ( $self ) = ( ref $_[0] ? shift : $_SELF );
310 10         90 my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} );
311              
312 10 50 33     235 if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) {
    50          
313 0         0 MCE::Hobo->finish('MCE'); CORE::exit(@_);
  0         0  
314             }
315             elsif ( $wrk_id == $$ ) {
316 0   0     0 alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0;
  0         0  
  0         0  
317 0 0       0 $_DATA->{$pkg}->set('R'.$wrk_id, @res ? $_freeze->(\@res) : '');
318 0         0 die "Hobo exited ($?)\n";
319 0         0 _exit($?); # not reached
320             }
321              
322 10 50       30 return $self if $self->{REAPED};
323              
324 10 50       85 if ( exists $_DATA->{$pkg} ) {
325 10         140 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
326             } else {
327 0         0 sleep 0.030;
328             }
329              
330 10 50       30 if ($_is_MSWin32) {
331 0 0       0 CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id);
332             } else {
333 10 50       400 CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
334             }
335              
336 10         55 $self;
337             }
338              
339             sub finish {
340 7 50   7 1 2258 _croak('Usage: MCE::Hobo->finish()') if ref($_[0]);
341 7 50 33     61 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
342              
343 7 100       38 my $pkg = defined($_[0]) ? $_[0] : caller();
344              
345 7 100       66 if ( $pkg eq 'MCE' ) {
    100          
346 3         7 for my $key ( keys %{ $_LIST } ) { MCE::Hobo->finish($key); }
  3         38  
  3         48  
347             }
348             elsif ( exists $_LIST->{$pkg} ) {
349 3 50       10 return if $MCE::Signal::KILLED;
350              
351 3 50       10 if ( exists $_DELY->{$pkg} ) {
352 3         32 &_force_reap($pkg);
353             delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}),
354             delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}),
355 3         132 delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg });
356             }
357             }
358              
359 7         31 @_ = ();
360              
361 7         19 return;
362             }
363              
364             sub is_joinable {
365 24 50   24 1 72 _croak('Usage: $hobo->is_joinable()') unless ref( my $self = $_[0] );
366 24         64 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
367              
368 24 50       120 if ( $wrk_id == $$ ) {
    50          
369 0         0 '';
370             }
371             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
372 24 50       48 return '' if $self->{REAPED};
373 24         64 local $!;
374 24 50       280 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do {
375 0 0       0 _reap_hobo($self, 0) unless $self->{REAPED};
376 0         0 1;
377             };
378             }
379             else {
380             _croak('Error: $hobo->is_joinable() not called by managed process')
381 0 0       0 if ( $self->{IGNORE} );
382              
383 0 0       0 return '' if $self->{REAPED};
384 0 0       0 $_DATA->{$pkg}->exists('R'.$wrk_id) ? 1 : '';
385             }
386             }
387              
388             sub is_running {
389 24 50   24 1 8736 _croak('Usage: $hobo->is_running()') unless ref( my $self = $_[0] );
390 24         56 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
391              
392 24 50       120 if ( $wrk_id == $$ ) {
    50          
393 0         0 1;
394             }
395             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
396 24 50       56 return '' if $self->{REAPED};
397 24         48 local $!;
398 24 50       312 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do {
399 0 0       0 _reap_hobo($self, 0) unless $self->{REAPED};
400 0         0 '';
401             };
402             }
403             else {
404             _croak('Error: $hobo->is_running() not called by managed process')
405 0 0       0 if ( $self->{IGNORE} );
406              
407 0 0       0 return '' if $self->{REAPED};
408 0 0       0 $_DATA->{$pkg}->exists('R'.$wrk_id) ? '' : 1;
409             }
410             }
411              
412             sub join {
413 45 50   45 1 23047 _croak('Usage: $hobo->join()') unless ref( my $self = $_[0] );
414 45         146 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
415              
416 45 50       115 if ( $self->{REAPED} ) {
417 0 0       0 _croak('Hobo already joined') unless exists( $self->{RESULT} );
418 0 0       0 $_LIST->{$pkg}->del($wrk_id) if ( exists $_LIST->{$pkg} );
419              
420             return ( defined wantarray )
421 0 0       0 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 0       0  
422             : ();
423             }
424              
425 45 50       267 if ( $wrk_id == $$ ) {
    50          
426 0         0 _croak('Cannot join self');
427             }
428             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
429             # remove from list after reaping
430 45 50       85 if ( $_tid ) {
431 0         0 local $SIG{CHLD};
432 0         0 _reap_hobo($self, 1);
433 0         0 $_LIST->{$pkg}->del($wrk_id);
434             }
435             else {
436 45         750 local ($SIG{CHLD}, $!);
437 45         25773476 waitpid($wrk_id, 0);
438 45         450 _reap_hobo($self, 0);
439 45         213 $_LIST->{$pkg}->del($wrk_id);
440             }
441             }
442             else {
443             _croak('Error: $hobo->join() not called by managed process')
444 0 0       0 if ( $self->{IGNORE} );
445              
446 0         0 sleep 0.3 until ( $_DATA->{$pkg}->exists('R'.$wrk_id) );
447 0         0 _reap_hobo($self, 0);
448             }
449              
450 45 50       181 return unless ( exists $self->{RESULT} );
451              
452             ( defined wantarray )
453 45 50       423 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 100       0  
454             : ();
455             }
456              
457             sub kill {
458 5 50   5 1 155 _croak('Usage: $hobo->kill()') unless ref( my $self = $_[0] );
459 5         65 my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] );
460              
461 5 50       25 if ( $wrk_id == $$ ) {
462 0   0     0 CORE::kill($signal || 'INT', $$);
463 0         0 return $self;
464             }
465 5 50       45 if ( $self->{MGR_ID} eq "$$.$_tid" ) {
466 5 50       35 return $self if $self->{REAPED};
467 5 50       15 if ( exists $_DATA->{$pkg} ) {
468 5         25 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
469             } else {
470 0         0 sleep 0.030;
471             }
472             }
473              
474 5 50 50     180 CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
475              
476 5         25 $self;
477             }
478              
479             sub list {
480 8 50   8 1 3992 _croak('Usage: MCE::Hobo->list()') if ref($_[0]);
481 8         40 my $pkg = "$$.$_tid.".caller();
482              
483 8 50       64 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : ();
484             }
485              
486             sub list_pids {
487 8 50   8 1 336 _croak('Usage: MCE::Hobo->list_pids()') if ref($_[0]);
488 8         256 my $pkg = "$$.$_tid.".caller(); local $_;
  8         80  
489              
490 8 50       264 ( exists $_LIST->{$pkg} ) ? map { $_->pid } $_LIST->{$pkg}->vals() : ();
  24         160  
491             }
492              
493             sub list_joinable {
494 8 50   8 1 5248 _croak('Usage: MCE::Hobo->list_joinable()') if ref($_[0]);
495 8         792 my $pkg = "$$.$_tid.".caller();
496              
497 8 50       360 return () unless ( my $list = $_LIST->{$pkg} );
498 8         48 local ($!, $?, $_);
499              
500             map {
501 8 50       56 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do {
  24         200  
502 0 0       0 _reap_hobo($_, 0) unless $_->{REAPED};
503 0         0 $_;
504             };
505             }
506             $list->vals();
507             }
508              
509             sub list_running {
510 8 50   8 1 17208 _croak('Usage: MCE::Hobo->list_running()') if ref($_[0]);
511 8         56 my $pkg = "$$.$_tid.".caller();
512              
513 8 50       56 return () unless ( my $list = $_LIST->{$pkg} );
514 8         96 local ($!, $?, $_);
515              
516             map {
517 8 50       64 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do {
  24         272  
518 0 0       0 _reap_hobo($_, 0) unless $_->{REAPED};
519 0         0 ();
520             };
521             }
522             $list->vals();
523             }
524              
525             sub max_workers {
526 17 50   17 1 67 _croak('Usage: MCE::Hobo->max_workers()') if ref($_[0]);
527 17   33     75 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
528             # construct mngd internally on first use unless defined
529             init(); $_MNGD->{ "$$.$_tid.".caller() };
530             };
531 17 50       61 shift if ( $_[0] eq __PACKAGE__ );
532              
533 17 100       41 $mngd->{max_workers} = _max_workers(shift) if @_;
534 17         82 $mngd->{max_workers};
535             }
536              
537             sub pending {
538 8 50   8 1 4400 _croak('Usage: MCE::Hobo->pending()') if ref($_[0]);
539 8         80 my $pkg = "$$.$_tid.".caller();
540              
541 8 50       64 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0;
542             }
543              
544             sub pid {
545 31 50   31 1 141 ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID};
546             }
547              
548             sub result {
549 7 50   7 1 102 _croak('Usage: $hobo->result()') unless ref( my $self = $_[0] );
550 7 50       27 return $self->join() unless $self->{REAPED};
551              
552 7 50       18 _croak('Hobo already joined') unless exists( $self->{RESULT} );
553 7 50       65 wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1];
  0         0  
554             }
555              
556             sub self {
557 0 0   0 1 0 ref($_[0]) ? $_[0] : $_SELF;
558             }
559              
560             sub wait_all {
561 1 50   1 1 90 _croak('Usage: MCE::Hobo->wait_all()') if ref($_[0]);
562 1         37 my $pkg = "$$.$_tid.".caller();
563              
564             return wantarray ? () : 0
565 1 0 33     54 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
    50          
566              
567 1         14 local $_; ( wantarray )
568 0         0 ? map { $_->join(); $_ } $_LIST->{$pkg}->vals()
  0         0  
569 1 50       21 : map { $_->join(); () } $_LIST->{$pkg}->vals();
  3         27  
  3         7  
570             }
571              
572             *waitall = \&wait_all; # compatibility
573              
574             sub wait_one {
575 4 50   4 1 228 _croak('Usage: MCE::Hobo->wait_one()') if ref($_[0]);
576 4         140 my $pkg = "$$.$_tid.".caller();
577              
578             return undef
579 4 50 33     124 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
580              
581 4         92 _wait_one($pkg);
582             }
583              
584             *waitone = \&wait_one; # compatibility
585              
586             sub yield {
587 0 0   0 1 0 _croak('Usage: MCE::Hobo->yield()') if ref($_[0]);
588 0 0 0     0 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
589              
590 0   0     0 my $pkg = $_SELF->{PKG} || do {
591             my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
592             # construct mngd internally on first use unless defined
593             init(); $_MNGD->{ "$$.$_tid.".caller() };
594             };
595             $mngd->{PKG};
596             };
597              
598 0 0       0 return unless $_DELY->{$pkg};
599 0         0 my $seconds = $_DELY->{$pkg}->seconds(@_);
600              
601 0         0 MCE::Util::_sleep( $seconds );
602             }
603              
604             ###############################################################################
605             ## ----------------------------------------------------------------------------
606             ## Private methods.
607             ##
608             ###############################################################################
609              
610             sub _croak {
611 0 0   0   0 if ( $INC{'MCE.pm'} ) {
612 0         0 goto &MCE::_croak;
613             }
614             else {
615 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
616 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
617              
618 0         0 $\ = undef; goto &Carp::croak;
  0         0  
619             }
620             }
621              
622             sub _dispatch {
623 12     12   79 my ( $mngd, $func, $args ) = @_;
624              
625 12         313 $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$, $? = 0;
626 12 50       76 $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32;
627              
628             {
629 12         50 local $!;
  12         294  
630 12 50       485 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
631 12 50       2573 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
632             }
633              
634             # Run task.
635             my $hobo_timeout = ( exists $_SELF->{hobo_timeout} )
636 12 50       484 ? $_SELF->{hobo_timeout} : $mngd->{hobo_timeout};
637              
638             my $void_context = ( exists $_SELF->{void_context} )
639 12 50       185 ? $_SELF->{void_context} : $mngd->{void_context};
640              
641 12         24 my @res; my $timed_out = 0;
  12         28  
642              
643             local $SIG{'ALRM'} = sub {
644 0     0   0 alarm 0; $timed_out = 1; $SIG{__WARN__} = sub {};
  0         0  
  0         0  
645 0         0 die "Hobo timed out\n";
646 12         578 };
647              
648 12 50 33     301 if ( $void_context || $_SELF->{IGNORE} ) {
649 15     15   108 no strict 'refs';
  15         29  
  15         623  
650 0   0     0 eval { alarm($hobo_timeout || 0); $func->(@{ $args }) };
  0         0  
  0         0  
  0         0  
651             }
652             else {
653 15     15   65 no strict 'refs';
  15         15  
  15         2606  
654 12   50     74 @res = eval { alarm($hobo_timeout || 0); $func->(@{ $args }) };
  12         710  
  12         33  
  12         165  
655             }
656              
657 9         26185 alarm 0;
658 9 50       65 $@ = "Hobo timed out" if $timed_out;
659              
660 9 50       44 if ( $@ ) {
661 0 0       0 _exit($?) if ( $@ =~ /^Hobo exited \(\S+\)$/ );
662 0         0 my $err = $@; $? = 1; $err =~ s/, <__ANONIO__> line \d+//;
  0         0  
  0         0  
663              
664 0 0       0 if ( ! $_SELF->{IGNORE} ) {
665             $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err),
666 0         0 $_DATA->{ $_SELF->{PKG} }->set('R'.$$, '');
667             }
668              
669 0 0 0     0 if ( !$timed_out && !$mngd->{on_finish} && !$INC{'MCE/Simple.pm'} ) {
      0        
670 15     15   7299 use bytes; warn "Hobo $$ terminated abnormally: reason $err\n";
  15         191  
  15         75  
  0         0  
671             }
672             }
673             else {
674 9 50       78 shift(@res) if ref($res[0]) =~ /^MCE::(?:Barrier|Semaphore)::_guard/s;
675             $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? $_freeze->(\@res) : '')
676 9 50       250 if ( ! $_SELF->{IGNORE} );
    50          
677             }
678              
679 9         229 _exit($?);
680             }
681              
682             sub _exit {
683 12     12   71 my ( $exit_status ) = @_;
684              
685             # Check for nested workers not yet joined.
686 12 50 66     192 MCE::Hobo->finish('MCE') if ( !$_SELF->{SIGNALED} && keys %{ $_LIST } );
  9         95  
687              
688             # Exit hobo process.
689 12 50   0   309 $SIG{__DIE__} = sub {} unless $_tid;
690 12     0   257 $SIG{__WARN__} = sub {};
691              
692 12 50 33     106 threads->exit($exit_status) if ( $INC{'threads.pm'} && $_is_MSWin32 );
693              
694             my $posix_exit = ( exists $_SELF->{posix_exit} )
695 12 50       100 ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit};
696              
697 12 0 33     96 if ( $posix_exit && !$_SELF->{SIGNALED} && !$_is_MSWin32 ) {
      33        
698 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
699 0 0       0 POSIX::_exit($exit_status) if $INC{'POSIX.pm'};
700 0         0 CORE::kill('KILL', $$);
701             }
702              
703 12         4227 CORE::exit($exit_status);
704             }
705              
706             sub _force_reap {
707 10     10   47 my ( $count, $pkg ) = ( 0, @_ );
708 10 50 33     137 return unless ( exists $_LIST->{$pkg} && $_LIST->{$pkg}->len() );
709              
710 0         0 for my $hobo ( $_LIST->{$pkg}->vals() ) {
711 0 0       0 next if $hobo->{IGNORE};
712              
713 0 0       0 if ( $hobo->is_running() ) {
714 0 0       0 sleep(0.015), CORE::kill('KILL', $hobo->pid())
715             if CORE::kill('ZERO', $hobo->pid());
716 0         0 $count++;
717             }
718             }
719              
720 0         0 $_LIST->{$pkg}->clear();
721              
722 0 0 0     0 warn "Finished with active hobo processes [$pkg] ($count)\n"
723             if ( $count && !$_is_MSWin32 );
724              
725 0         0 return;
726             }
727              
728             sub _quit {
729 3 50   3   2682 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
730              
731 3         14 alarm 0; my ( $name ) = @_;
  3         14  
732 3         76 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
733              
734       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
735 3 50       118 if ( exists $SIG{$name} );
736              
737 3 50       15 if ( ! $_SELF->{IGNORE} ) {
738 3         21 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
739 3         18 $_DATA->{$pkg}->set('R'.$wrk_id, '');
740             }
741              
742 3         15 _exit(0);
743             }
744              
745             sub _reap_hobo {
746 49     49   128 my ( $hobo, $wait_flag ) = @_;
747 49 50       261 return unless $hobo;
748              
749 49         481 local @_ = $_DATA->{ $hobo->{PKG} }->get($hobo->{WRK_ID}, $wait_flag);
750              
751 49 100 50     1428 ( $hobo->{ERROR}, $hobo->{RESULT}, $hobo->{REAPED} ) =
752             ( pop || '', length $_[0] ? $_thaw->(pop) : [], 1 );
753              
754 49 50       300 return if $hobo->{IGNORE};
755              
756 49   50     351 my ( $exit, $err ) = ( $? || 0, $hobo->{ERROR} );
757 49         150 my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f );
758              
759 49 50 33     123 if ( $code > 100 && !$err ) {
760 0 0       0 $code = 2, $sig = 1, $err = 'Hobo received SIGHUP' if $code == 101;
761 0 0       0 $code = 2, $sig = 2, $err = 'Hobo received SIGINT' if $code == 102;
762 0 0       0 $code = 2, $sig = 6, $err = 'Hobo received SIGABRT' if $code == 106;
763 0 0       0 $code = 2, $sig = 11, $err = 'Hobo received SIGSEGV' if $code == 111;
764 0 0       0 $code = 2, $sig = 15, $err = 'Hobo received SIGTERM' if $code == 115;
765              
766 0         0 $hobo->{ERROR} = $err;
767             }
768              
769 49 100       156 if ( my $on_finish = $_MNGD->{ $hobo->{PKG} }{on_finish} ) {
770             $on_finish->(
771             $hobo->{WRK_ID}, $code, $hobo->{ident}, $sig, $err,
772 7         33 @{ $hobo->{RESULT} }
  7         64  
773             );
774             }
775              
776 49         198 return;
777             }
778              
779             sub _trap {
780 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
781              
782 0         0 alarm 0; my ( $exit_status, $name ) = ( 2, @_ );
  0         0  
783 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
784              
785       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
786 0 0       0 if ( exists $SIG{$name} );
787              
788 0 0       0 if ( $name eq 'HUP' ) { $exit_status = 101 }
  0 0       0  
    0          
    0          
    0          
789 0         0 elsif ( $name eq 'INT' ) { $exit_status = 102 }
790 0         0 elsif ( $name eq 'ABRT' ) { $exit_status = 106 }
791 0         0 elsif ( $name eq 'SEGV' ) { $exit_status = 111 }
792 0         0 elsif ( $name eq 'TERM' ) { $exit_status = 115 }
793              
794 0 0       0 if ( ! $_SELF->{IGNORE} ) {
795 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
796 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
797             }
798              
799 0         0 _exit($exit_status);
800             }
801              
802             sub _wait_one {
803 4     4   32 my ( $pkg ) = @_;
804 4         16 my ( $list, $self, $wrk_id ) = ( $_LIST->{$pkg} ); local $!;
  4         124  
805              
806 4         32 while () {
807 208         2444 for my $hobo ( $list->vals() ) {
808 208         752 $wrk_id = $hobo->{WRK_ID};
809 208 50       780 return $list->del($wrk_id) if $hobo->{REAPED};
810 208 100       3020 $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG);
811             }
812 208 100       640 last if $self;
813 204         6143420 sleep 0.030;
814             }
815              
816 4         28 _reap_hobo($self, 0);
817              
818 4         28 $self;
819             }
820              
821             ###############################################################################
822             ## ----------------------------------------------------------------------------
823             ## Delay implementation suited for MCE::Hobo.
824             ##
825             ###############################################################################
826              
827             package # hide from rpm
828             MCE::Hobo::_delay;
829              
830             sub new {
831 15     15   71 my ( $class, $chnl, $delay ) = @_;
832              
833 15 50       93 if ( !defined $delay ) {
834 15 50       216 $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008;
835             }
836              
837 15         163 $chnl->send(undef);
838              
839 15         710 bless [ $delay, $chnl ], $class;
840             }
841              
842             sub seconds {
843 0     0   0 my ( $self, $how_long ) = @_;
844 0 0       0 my $delay = defined($how_long) ? $how_long : $self->[0];
845 0         0 my $lapse = $self->[1]->recv();
846 0         0 my $time = MCE::Util::_time();
847              
848 0 0 0     0 if ( !$delay || !defined $lapse ) {
    0          
849 0         0 $lapse = $time;
850             }
851             elsif ( $lapse + $delay - $time < 0 ) {
852 0         0 $lapse += int( abs($time - $lapse) / $delay + 0.5 ) * $delay;
853             }
854              
855 0         0 $self->[1]->send( $lapse += $delay );
856              
857 0         0 return $lapse - $time;
858             }
859              
860             ###############################################################################
861             ## ----------------------------------------------------------------------------
862             ## Hash and ordhash implementations suited for MCE::Hobo.
863             ##
864             ###############################################################################
865              
866             package # hide from rpm
867             MCE::Hobo::_hash;
868              
869 15     15   27720 use MCE::Shared ();
  15         43  
  15         385  
870 15     15   86 use Time::HiRes 'sleep';
  15         15  
  15         138  
871              
872             use constant {
873 15 50       10504 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
874             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
875 15     15   1834 };
  15         30  
876              
877             sub new {
878 15     15   369 bless \ MCE::Shared->share({ module => 'MCE::Shared::Hash' }), shift;
879             }
880              
881 7     7   19 sub clear { ${ $_[0] }->clear(); }
  7         142  
882 15     15   25 sub exists { ${ $_[0] }->exists($_[1]); }
  15         435  
883 24     24   113 sub set { ${ $_[0] }->set($_[1], $_[2]); }
  24         644  
884              
885             sub get {
886 49     49   180 my ( $self, $wrk_id, $wait_flag ) = @_;
887              
888 49 50       143 if ( $wait_flag ) {
889 0         0 local $!;
890 0 0       0 ( ${ $self }->exists('R'.$wrk_id) ) ? waitpid($wrk_id, 0) : do {
  0         0  
891 0         0 while () {
892 0 0       0 if ( ! ${ $self }->exists('R'.$wrk_id) ) {
  0         0  
893 0 0       0 last if waitpid($wrk_id, _WNOHANG);
894 0         0 sleep(0.030), next;
895             }
896 0         0 waitpid($wrk_id, 0), last;
897             }
898             };
899             }
900              
901 49         86 ${ $self }->_get_hobo_data($wrk_id);
  49         599  
902             }
903              
904             package # hide from rpm
905             MCE::Hobo::_ordhash;
906              
907 15     15   193 sub new { bless [ {}, [], {}, 0 ], shift; } # data, keys, indx, gcnt
908 0     0   0 sub exists { CORE::exists $_[0]->[0]{ $_[1] }; }
909 0     0   0 sub get { $_[0]->[0]{ $_[1] }; }
910 23     23   166 sub len { scalar keys %{ $_[0]->[0] }; }
  23         1394  
911              
912             sub clear {
913 0     0   0 my ( $self ) = @_;
914 0         0 %{ $self->[0] } = @{ $self->[1] } = %{ $self->[2] } = (), $self->[3] = 0;
  0         0  
  0         0  
  0         0  
915              
916 0         0 return;
917             }
918              
919             sub del {
920 49     49   117 my ( $self, $key ) = @_;
921 49 50       224 return undef unless defined( my $off = delete $self->[2]{$key} );
922              
923             # tombstone
924 49         117 $self->[1][$off] = undef;
925              
926             # GC keys and refresh index
927 49 100       122 if ( ++$self->[3] > @{ $self->[1] } * 0.667 ) {
  49         242  
928 21         75 my ( $keys, $indx ) = ( $self->[1], $self->[2] );
929 21         36 my $i; $i = $self->[3] = 0;
  21         78  
930 21         53 for my $k ( @{ $keys } ) {
  21         183  
931 49 50       156 $keys->[$i] = $k, $indx->{$k} = $i++ if defined($k);
932             }
933 21         38 splice @{ $keys }, $i;
  21         68  
934             }
935              
936 49         818 delete $self->[0]{$key};
937             }
938              
939             sub set {
940 58     58   1017 my ( $self, $key ) = @_;
941 58 50       1220 $self->[0]{$key} = $_[2], return 1 if exists($self->[0]{$key});
942              
943 58         230 $self->[2]{$key} = @{ $self->[1] }; push @{ $self->[1] }, $key;
  58         1523  
  58         190  
  58         1539  
944 58         560 $self->[0]{$key} = $_[2];
945              
946 58         244 return 1;
947             }
948              
949             sub vals {
950 241     241   831 my ( $self ) = @_;
951             $self->[3]
952 0         0 ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } }
  0         0  
953 241 50       987 : @{ $self->[0] }{ @{ $self->[1] } };
  241         1438  
  241         791  
954             }
955              
956             1;
957              
958             __END__