File Coverage

lib/Parallel/WorkUnit.pm
Criterion Covered Total %
statement 342 471 72.6
branch 126 216 58.3
condition 11 21 52.3
subroutine 49 54 90.7
pod 11 11 100.0
total 539 773 69.7


line stmt bran cond sub pod time code
1             #
2             # Copyright (C) 2015-2023 Joelle Maslak
3             # All Rights Reserved - See License
4             #
5              
6             package Parallel::WorkUnit;
7             $Parallel::WorkUnit::VERSION = '2.232040'; # TRIAL
8 228     228   46269401 use v5.8;
  228         2440  
9              
10             # ABSTRACT: Provide multi-paradigm forking with ability to pass back data
11              
12 228     228   1230 use strict;
  228         456  
  228         4210  
13 228     228   1129 use warnings;
  228         356  
  228         5642  
14 228     228   1092 use autodie;
  228         363  
  228         1920  
15              
16 228     228   1273879 use Carp;
  228         461  
  228         15214  
17              
18 228     228   1576 use overload;
  228         524  
  228         1698  
19 228     228   145165 use IO::Handle;
  228         1406957  
  228         10702  
20 228     228   115878 use IO::Pipely qw(pipely);
  228         4040046  
  228         15532  
21 228     228   109897 use IO::Select;
  228         392088  
  228         11957  
22 228     228   2732 use POSIX ':sys_wait_h';
  228         658  
  228         2906  
23 228     228   409643 use Scalar::Util qw(blessed reftype weaken);
  228         451  
  228         14787  
24 228     228   158375 use Storable;
  228         768440  
  228         13191  
25 228     228   117065 use Try::Tiny;
  228         486998  
  228         1323473  
26              
27              
28             my @ALL_WU; # Holds all active work units so child processes can't
29             # mess with parent work units
30             # Note it holds a reference (strong) to a reference
31             # (weak).
32              
33              
34             sub use_anyevent {
35 4437 50   4437 1 30182 if ( $#_ == 0 ) {
    0          
36 4437         22908 return shift->{use_anyevent};
37             } elsif ( $#_ == 1 ) {
38 0         0 my ( $self, $val ) = @_;
39              
40 0         0 my ($old_val) = $self->{use_anyevent};
41 0         0 $self->{use_anyevent} = $val;
42              
43             # Trigger
44 0         0 $self->_set_anyevent( $val, $old_val );
45              
46 0         0 return $val;
47             } else {
48 0         0 confess("Invalid call");
49             }
50             }
51              
52              
53             # XXX: Add validation that _cv is a Maybe[AnyEvent::CondVar]
54             sub _cv {
55 3198 100   3198   17928 if ( $#_ == 0 ) {
    50          
56 2980         12500 return shift->{_cv};
57             } elsif ( $#_ == 1 ) {
58 218         3580 my ( $self, $val ) = @_;
59 218         5817 $self->{_cv} = $val;
60 218         3124 return $val;
61             } else {
62 0         0 confess("Invalid call");
63             }
64             }
65              
66             # XXX: Add validation that _last_error is a Maybe[Str]
67             sub _last_error {
68 10303 100   10303   32858 if ( $#_ == 0 ) {
    50          
69 10085         38897 return shift->{_last_error};
70             } elsif ( $#_ == 1 ) {
71 218         2997 my ( $self, $val ) = @_;
72 218         5256 $self->{_last_error} = $val;
73 218         2655 return $val;
74             } else {
75 0         0 confess("Invalid call");
76             }
77             }
78              
79             # XXX: Add validation that _ordered_count is a non-negative integer
80             sub _ordered_count {
81 5579 100   5579   20993 if ( $#_ == 0 ) {
    50          
82 2487         5142 my $self = shift;
83              
84             # Initialize
85 2487 100       8776 if ( !exists( $self->{_ordered_count} ) ) { $self->{_ordered_count} = 0; }
  136         461  
86              
87 2487         7030 return $self->{_ordered_count};
88             } elsif ( $#_ == 1 ) {
89 3092         8366 my ( $self, $val ) = @_;
90 3092         7807 $self->{_ordered_count} = $val;
91 3092         7062 return $val;
92             } else {
93 0         0 confess("Invalid call");
94             }
95             }
96              
97             # XXX: Add validation that _ordered_responses is an ArrayRef
98             sub _ordered_responses {
99 2923 100   2923   14233 if ( $#_ == 0 ) {
    50          
100 2318         5862 my $self = shift;
101              
102             # Initialize
103 2318 100       8280 if ( !exists( $self->{_ordered_responses} ) ) { $self->{_ordered_responses} = []; }
  164         1652  
104              
105 2318         15066 return $self->{_ordered_responses};
106             } elsif ( $#_ == 1 ) {
107 605         4228 my ( $self, $val ) = @_;
108 605         4828 $self->{_ordered_responses} = $val;
109 605         2578 return $val;
110             } else {
111 0         0 confess("Invalid call");
112             }
113             }
114              
115              
116             sub max_children {
117 8418 100   8418 1 89206 if ( $#_ == 0 ) {
    50          
118 7792         15024 my $self = shift;
119              
120 7792 100       17637 if ( !exists( $self->{max_children} ) ) { $self->{max_children} = 5; }
  61         220  
121              
122 7792         65900 return $self->{max_children};
123             } elsif ( $#_ == 1 ) {
124 626         1540 my ( $self, $val ) = @_;
125              
126             # Validate
127 626 100       2428 if ( defined($val) ) {
128 554 100       6969 if ( $val !~ m/^[0-9]+$/s ) {
129 92         11270 confess("max_children must be set to a positive integer");
130             }
131 462 100       1977 if ( $val <= 0 ) {
132 46         3726 confess("max_children must be set to a positive integer");
133             }
134             }
135              
136 488         1264 $self->{max_children} = $val;
137              
138             # Trigger
139 488         1622 $self->_start_queued_children();
140              
141 474         1481 return $val;
142             } else {
143 0         0 confess("Invalid call");
144             }
145             }
146              
147             # XXX: Add validation that _subprocs is a HashRef
148             sub _subprocs {
149 31944 100   31944   148298 if ( $#_ == 0 ) {
    50          
150 31726         97791 my $self = shift;
151              
152             # Initial value
153 31726 100       118829 if ( !exists( $self->{_subprocs} ) ) { $self->{_subprocs} = {}; }
  313         2550  
154              
155 31726         202672 return $self->{_subprocs};
156             } elsif ( $#_ == 1 ) {
157 218         2582 my ( $self, $val ) = @_;
158 218         20859 $self->{_subprocs} = $val;
159 218         4483 return $val;
160             } else {
161 0         0 confess("Invalid call");
162             }
163             }
164              
165             # XXX: Add validation that _count is a positive integer
166             # This only gets used on Win32.
167             sub _count {
168 218 50   218   6455 if ( $#_ == 0 ) {
    50          
169 0         0 my $self = shift;
170              
171             # Initial value
172 0 0       0 if ( !exists( $self->{_count} ) ) { $self->{_count} = 1; }
  0         0  
173              
174 0         0 return $self->{_count};
175             } elsif ( $#_ == 1 ) {
176 218         2543 my ( $self, $val ) = @_;
177 218         4510 $self->{_count} = $val;
178 218         2647 return $val;
179             } else {
180 0         0 confess("Invalid call");
181             }
182             }
183              
184             # XXX: Add validation that _parent_pid is a positive integer
185             # We also need to initialize always in the parent process.
186             sub _parent_pid {
187 325 50   325   1856 if ( $#_ == 0 ) {
    50          
188 0         0 return shift->{_parent_pid};
189             } elsif ( $#_ == 1 ) {
190 325         1983 my ( $self, $val ) = @_;
191 325         8095 $self->{_parent_pid} = $val;
192 325         870 return $val;
193             } else {
194 0         0 confess("Invalid call");
195             }
196             }
197              
198             # Children queued
199             # XXX: Add validation that _queued_cildren is an
200             # ArrayRef[ArrayRef[CodeRef]]
201             sub _queued_children {
202 16064 100   16064   58666 if ( $#_ == 0 ) {
    50          
203 15846         35245 my $self = shift;
204              
205 15846 100       41021 if ( !exists( $self->{_queued_children} ) ) { $self->{_queued_children} = []; }
  275         1968  
206              
207 15846         66630 return $self->{_queued_children};
208             } elsif ( $#_ == 1 ) {
209 218         2086 my ( $self, $val ) = @_;
210 218         4921 $self->{_queued_children} = $val;
211 218         1676 return $val;
212             } else {
213 0         0 confess("Invalid call");
214             }
215             }
216              
217              
218             sub new {
219 325     325 1 56861 my $class = shift;
220 325         838 my $self = {};
221 325         1182 bless $self, $class;
222              
223             # Initialize parent PID
224 325         2323 $self->_parent_pid($$);
225              
226             # Make a weak reference and shove it into the ALL_WU array
227 325         642 my $weakself = $self;
228 325         1674 weaken $weakself;
229 325         798 push @ALL_WU, \$weakself;
230              
231             # Do some housekeeping on @ALL_WU, so it is somewhat bounded
232 325         846 @ALL_WU = grep { defined $$_ } @ALL_WU;
  471         1547  
233              
234             # Do we have any arguments?
235 325 100       1359 if (scalar(@_) > 0) {
236 92 100       322 my %args = (scalar(@_) == 1) ? %{shift()} : @_;
  46         138  
237              
238 92 50       276 if (exists $args{use_anyevent}) {
239 0         0 $self->use_anyevent($args{use_anyevent});
240             }
241 92 50       276 if (exists $args{max_children}) {
242 92         230 $self->max_children($args{max_children});
243             }
244             }
245              
246 325         1185 return $self;
247             }
248              
249              
250             sub async {
251 4250 50   4250 1 55139179 if ( $#_ < 1 ) { confess 'invalid call'; }
  0         0  
252 4250         11789 my $self = shift;
253 4250         14469 my $sub = shift;
254              
255             # Test $sub to make sure it is a code ref or a sub ref
256 4250 100       44506 if ( !_codelike($sub) ) {
257 7         728 confess("Parameter to async() is not a code (or codelike) reference");
258             }
259              
260 4243         8842 my $callback;
261 4243 100       15678 if ( scalar(@_) == 0 ) {
    50          
262             # No callback provided
263              
264 2487         14356 my $cbnum = $self->_ordered_count;
265 2487         8484 $self->_ordered_count( $cbnum + 1 );
266              
267             # We create a callback that populates the ordered responses
268 2487         7916 my $selfref = $self;
269 2487         13114 weaken $selfref;
270             $callback = sub {
271 1931 50   1931   7590 if ( defined $selfref ) { # In case this went away
272 1931         3855 @{ $selfref->_ordered_responses }[$cbnum] = shift;
  1931         6353  
273             }
274 2487         59225 };
275             } elsif ( scalar(@_) == 1 ) {
276             # Callback provided
277 1756         3470 $callback = shift;
278             } else {
279 0         0 confess 'invalid call';
280             }
281              
282             # If there are pending errors, throw that.
283 4243 50       25784 if ( defined( $self->_last_error ) ) { die( $self->_last_error ); }
  0         0  
284              
285 4243         85774 my $pipe = [ pipely() ];
286              
287 4243         970496 my $pid = fork();
288              
289 4243 100       6656858 if ($pid) {
290             # We are in the parent process
291              
292 4031         369973 $pipe = $pipe->[0];
293              
294 4031         443164 $self->_subprocs()->{$pid} = {
295             fh => $pipe,
296             watcher => undef,
297             callback => $callback,
298             caller => [ caller() ],
299             };
300              
301             # Set up anyevent listener if appropriate
302 4031 50       111707 if ( $self->use_anyevent() ) {
303 0         0 $self->_add_anyevent_watcher($pid);
304             }
305              
306 4031         171209 return $pid;
307              
308             } else {
309             # We are in the child process
310 212         28094 $pipe = $pipe->[1];
311              
312 212         24128 return $self->_child( $sub, $pipe );
313             }
314             }
315              
316              
317             sub asyncs {
318 79 50   79 1 554 if ( $#_ < 2 ) { confess 'invalid call'; }
  0         0  
319 79         291 my $self = shift;
320 79         211 my $children = shift;
321 79         212 my $sub = shift;
322 79 50       422 if ( scalar(@_) > 1 ) { confess("invalid call"); }
  0         0  
323              
324 79 50       2002 if ( $children !~ m/^[1-9][0-9]*$/s ) {
325 0         0 confess("Number of children must be a numeric value > 0");
326             }
327              
328 79         583 for ( my $i = 0; $i < $children; $i++ ) {
329 655     30   8051 $self->async( sub { return $sub->($i); }, @_ );
  30         233  
330             }
331              
332 49         1660 return $children;
333             }
334              
335             sub _child {
336 212 50   212   8386 if ( scalar(@_) != 3 ) { confess 'invalid call'; }
  0         0  
337 212         4866 my ( $self, $sub, $pipe ) = @_;
338              
339             # Cleanup ALL_WU
340 212         8695 @ALL_WU = grep { defined $$_ } @ALL_WU;
  263         8726  
341 212         2639 foreach my $wu ( map { $$_ } @ALL_WU ) {
  218         4917  
342 218         10161 $wu->_clear_all();
343             }
344              
345             try {
346 212     212   50371 my $result = $sub->();
347 205         3268377 $self->_send_result( $pipe, $result );
348             } catch {
349 7     7   2392 $self->_send_error( $pipe, $_ );
350 212         10788 };
351              
352 210         130075 exit();
353             }
354              
355              
356             sub waitall {
357 3367 50   3367 1 45657 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
358 3367         16836 my ($self) = @_;
359              
360             # No subprocs? Just return.
361 3367 100       7052 if ( scalar( keys %{ $self->_subprocs } ) == 0 ) {
  3367         13475  
362 387 50       4468 if ( $self->use_anyevent ) {
363 0         0 $self->_cv( AnyEvent->condvar );
364             }
365              
366 387         3491 return $self->_get_and_reset_ordered_responses();
367             }
368              
369             # Using cv?
370 2980 50       20596 if ( defined( $self->_cv ) ) {
371 0         0 $self->_cv->recv();
372 0 0       0 if ( defined( $self->_last_error ) ) {
373 0         0 my $err = $self->_last_error;
374 0         0 $self->_last_error(undef);
375              
376 0         0 die($err);
377             }
378              
379 0         0 return $self->_get_and_reset_ordered_responses();
380             }
381              
382             # Tail recursion
383 2980 50       10192 if ( $self->_waitone() ) { goto &waitall }
  2947         24670  
384              
385 0         0 return @{ $self->_get_and_reset_ordered_responses() };
  0         0  
386             }
387              
388             # Gets the _ordered_responses and returns the reference. Also
389             # resets the _ordered_responses and the _ordered_counts to an
390             # empty list and zero respectively.
391             sub _get_and_reset_ordered_responses {
392 387 50   387   2852 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
393 387         1694 my $self = shift;
394              
395 387         1432 my (@r) = @{ $self->_ordered_responses() };
  387         2765  
396              
397 387         3188 $self->_ordered_responses( [] );
398 387         3801 $self->_ordered_count(0);
399              
400 387         9652 return @r;
401             }
402              
403              
404             sub waitone {
405 372 50   372 1 173280 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
406 372         2714 my ($self) = @_;
407              
408 372         3230 my $rv = $self->_waitone();
409              
410             # Using AnyEvent?
411 368 50       6285 if ( defined( $self->_last_error ) ) {
412 0         0 my $err = $self->_last_error;
413 0         0 $self->_last_error(undef);
414              
415 0         0 die($err);
416             }
417              
418 368         5552 return $rv;
419             }
420              
421             # Meat of waitone (but doesn't handle returning an exception when using
422             # anyevent)
423             sub _waitone {
424 3352 50   3352   12890 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
425 3352         8853 my ($self) = @_;
426              
427 3352         10183 my $sp = $self->_subprocs();
428 3352         15763 weaken $sp; # To avoid some Windows warnings
429 3352 100       14590 if ( !keys(%$sp) ) { return; }
  8         84  
430              
431             # On everything but Windows
432             #
433 3344         72528 my $s = IO::Select->new();
434 3344         69163 foreach ( keys(%$sp) ) { $s->add( $sp->{$_}{fh} ); }
  14564         561537  
435              
436 3344         164400 my @ready = $s->can_read();
437              
438 3344         2546129 foreach my $fh (@ready) {
439 3344         17368 foreach my $child ( keys(%$sp) ) {
440 8607 50       92029 if ( defined( $fh->fileno() ) ) {
441 8607 100       66054 if ( $fh->fileno() == $sp->{$child}{fh}->fileno() ) {
442 3344         48385 $self->_read_result($child);
443              
444 3325         637342628 waitpid( $child, 0 );
445              
446             # Start queued children, if needed
447 3325         51502 $self->_start_queued_children();
448              
449 3307         95157 return 1; # We don't want to read more than one!
450             }
451             }
452             }
453             }
454              
455             # We should never get here
456 0         0 return;
457             }
458              
459              
460             ## no critic ('Subroutines::ProhibitBuiltinHomonyms')
461             sub wait {
462 8 50   8 1 2049 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
463 8         107 my ( $self, $pid ) = @_;
464              
465 8         185 my $rv = $self->_wait($pid);
466              
467 8 50       124 if ( defined( $self->_last_error ) ) {
468 0         0 my $err = $self->_last_error;
469 0         0 $self->_last_error(undef);
470              
471 0         0 die($err);
472             }
473              
474 8         100 return $rv;
475             }
476              
477             # Internal version that doesn't check for AnyEvent die needs
478             sub _wait {
479 8 50   8   203 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
480 8         104 my ( $self, $pid ) = @_;
481              
482 8 100       155 if ( !exists( $self->_subprocs()->{$pid} ) ) {
483              
484             # We don't warn/die because it's possible that there is
485             # a race between callback and here, in the main thread.
486 4         63 return;
487             }
488              
489 4         199 my $result = $self->_read_result($pid);
490              
491 4         2979210 waitpid( $pid, 0 );
492              
493 4         173 return $result;
494             }
495             ## use critic
496              
497              
498             sub count {
499 7292 50   7292 1 3372666 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
500 7292         18837 my ($self) = @_;
501              
502 7292         26629 my $sp = $self->_subprocs();
503 7292         132394 return scalar( keys %$sp );
504             }
505              
506              
507             sub queue {
508 2302 50   2302 1 60525043 if ( $#_ < 1 ) { confess 'invalid call'; }
  0         0  
509 2302         5247 my $self = shift;
510 2302         3303 my $sub = shift;
511              
512             # Test $sub to make sure it is a code ref or a sub ref
513 2302 100       13893 if ( !_codelike($sub) ) {
514 7         1687 confess("Parameter to queue() is not a code (or codelike) reference");
515             }
516              
517 2295         3945 my $callback;
518 2295 100       6978 if ( scalar(@_) == 0 ) {
    50          
519             # We're okay, don't need to do anything - no callback
520             } elsif ( scalar(@_) == 1 ) {
521             # We have a callback
522 1136         1708 $callback = shift;
523             } else {
524 0         0 confess 'invalid call';
525             }
526              
527             # If there are pending errors, throw that.
528 2295 50       8494 if ( defined( $self->_last_error ) ) { die( $self->_last_error ); }
  0         0  
529              
530 2295         3927 push @{ $self->_queued_children }, [ $sub, $callback ];
  2295         4924  
531 2295         6597 return $self->_start_queued_children();
532             }
533              
534             sub _send_result {
535 205 50   205   2890 if ( $#_ != 2 ) { confess 'invalid call'; }
  0         0  
536 205         2030 my ( $self, $fh, $msg ) = @_;
537              
538 205         3552 return $self->_send( $fh, 'RESULT', $msg );
539             }
540              
541             sub _send_error {
542 7 50   7   420 if ( $#_ != 2 ) { confess 'invalid call'; }
  0         0  
543 7         175 my ( $self, $fh, $err ) = @_;
544              
545 7         258 return $self->_send( $fh, 'ERROR', $err );
546             }
547              
548             sub _send {
549 212 50   212   2402 if ( $#_ != 3 ) { confess 'invalid call'; }
  0         0  
550 212         3569 my ( $self, $fh, $type, $data ) = @_;
551              
552 212         1259 my $msg;
553 212 100 100     6433 if ( blessed($data) && ($data->can('FREEZE')) && ($data->can('THAW')) ) {
      66        
554 1         32 $msg = ref($data) . "!::!" . $data->FREEZE();
555             } else {
556 211         8382 $msg = "!::!" . Storable::freeze( \$data );
557             }
558              
559 210 50       94880 if ( !defined($msg) ) {
560 0         0 die 'freeze() returned undef for child return value';
561             }
562              
563 210         18074 $fh->write($type);
564 210         17532 $fh->write("\n");
565              
566 210         6509 $fh->write( length($msg) );
567 210         6102 $fh->write("\n");
568              
569 210         9124 binmode( $fh, ':raw' );
570              
571 210         134215 $fh->write($msg);
572              
573 210         33795 $fh->close();
574 210         7460 return;
575             }
576              
577             sub _read_result {
578 3348 50   3348   12032 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
579 3348         10163 my ( $self, $child ) = @_;
580              
581 3348         10929 my $cinfo = $self->_subprocs()->{$child};
582 3348 50       11400 if (defined($cinfo->{rawbuff})) {
583 0         0 return $self->_read_result_from_buffer($child);
584             } else {
585 3348         16321 return $self->_read_result_from_fh($child);
586             }
587             }
588              
589             sub _read_result_from_buffer {
590 0 0   0   0 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
591 0         0 my ( $self, $child ) = @_;
592              
593 0         0 my $cinfo = $self->_subprocs()->{$child};
594 0         0 $cinfo->{fh}->close();
595              
596 0         0 my ($type, $size, $buffer) = split /\n/, $cinfo->{rawbuff}, 3;
597 0         0 delete $cinfo->{rawbuff};
598              
599 0 0       0 if ( !defined($type) ) { die 'Could not read child data'; }
  0         0  
600 0 0       0 if ( !defined($size) ) { die 'Could not read child data'; }
  0         0  
601              
602 0         0 my ($class, $frozen) = split("!::!", $buffer, 2);
603 0         0 my $data;
604 0 0       0 if ($class eq "") {
605 0         0 $data = ${ Storable::thaw($frozen) };
  0         0  
606             } else {
607 0         0 $data = $class->THAW($frozen);
608             }
609              
610              
611 0         0 my $caller = $self->_subprocs()->{$child}{caller};
612 0         0 delete $self->_subprocs()->{$child};
613              
614 0 0       0 if ( $type eq 'RESULT' ) {
615 0         0 $cinfo->{callback}->($data);
616             } else {
617 0         0 my $err =
618             "Child (created at "
619             . $caller->[1]
620             . " line "
621             . $caller->[2]
622             . ") died with error: $data";
623              
624 0 0       0 if ( $self->use_anyevent ) {
625             # Can't throw events with anyevent
626 0         0 $self->_last_error($err);
627             } else {
628             # Otherwise we do throw it
629 0         0 die($err);
630             }
631             }
632              
633 0         0 return;
634             }
635              
636             sub _read_result_from_fh {
637 3348 50   3348   15191 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
638 3348         9465 my ( $self, $child ) = @_;
639              
640 3348         9449 my $cinfo = $self->_subprocs()->{$child};
641 3348         7484 my $fh = $cinfo->{fh};
642              
643 3348         130113 my $type = <$fh>;
644 3348 50       26112 if ( !defined($type) ) { die 'Could not read child data'; }
  0         0  
645 3348         19388 chomp($type);
646              
647 3348         13576 my $size = <$fh>;
648 3348         11701 chomp($size);
649              
650 3348         32300 binmode($fh);
651              
652 3348         677435 my $result = '';
653              
654 3348         8274 my $ret = 1;
655 3348   66     55023 while ( defined($ret) && ( length($result) < $size ) ) {
656 3348         18743 my $s = $size - length($result);
657              
658 3348         9302 my $part = '';
659 3348         28515 $ret = $fh->read( $part, $s );
660 3348 50       181044 if ( defined($ret) ) { $result .= $part; }
  3348         75537  
661             }
662              
663 3348         82241 my ($class, $frozen) = split("!::!", $result, 2);
664 3348         9846 my $data;
665 3348 100       11644 if ($class eq "") {
666 3347         6712 $data = ${ Storable::thaw($frozen) };
  3347         25437  
667             } else {
668 1         30 $data = $class->THAW($frozen);
669             }
670              
671 3348         468857 my $caller = $self->_subprocs()->{$child}{caller};
672 3348         8990 delete $self->_subprocs()->{$child};
673 3348         20354 $fh->close();
674              
675 3348 100       104223 if ( $type eq 'RESULT' ) {
676 3329         20107 $cinfo->{callback}->($data);
677             } else {
678 19         760 my $err =
679             "Child (created at "
680             . $caller->[1]
681             . " line "
682             . $caller->[2]
683             . ") died with error: $data";
684              
685 19 50       569 if ( $self->use_anyevent ) {
686             # Can't throw events with anyevent
687 0         0 $self->_last_error($err);
688             } else {
689             # Otherwise we do throw it
690 19         1220 die($err);
691             }
692             }
693              
694 3329         70224 return;
695             }
696              
697             # Start queued children, if possible.
698             # Returns 1 if children were started, undef otherwise
699             sub _start_queued_children {
700 6108 50   6108   29034 if ( $#_ != 0 ) { confess 'invalid call' }
  0         0  
701 6108         20218 my ($self) = @_;
702              
703 6108 100       11942 if ( !( @{ $self->_queued_children } ) ) { return; }
  6108         18268  
  2937         8143  
704 3171 50       11493 if ( defined( $self->_last_error ) ) { return; } # Do not queue if there are errors
  0         0  
705              
706             # Can we start a queued process?
707 3171         7725 while ( scalar @{ $self->_queued_children } ) {
  5260         45462  
708 4011 100 100     24453 if ( ( !defined( $self->max_children ) ) || ( $self->count < $self->max_children ) ) {
709             # Start queued child
710 2183         3963 my $ele = shift @{ $self->_queued_children };
  2183         5509  
711 2183 100       8784 if ( !defined( $ele->[1] ) ) {
712 1103         4842 $self->async( $ele->[0] );
713             } else {
714 1080         4423 $self->async( $ele->[0], $ele->[1] );
715             }
716             } else {
717             # Can't unqueue
718 1828         11953 return;
719             }
720             }
721              
722             # We started at least one process
723 1249         40630 return 1;
724             }
725              
726             # Sets up AnyEvent or tears it down as needed
727             sub _set_anyevent {
728 0 0   0   0 if ( $#_ < 1 ) { confess 'invalid call' }
  0         0  
729 0 0       0 if ( $#_ > 2 ) { confess 'invalid call' }
  0         0  
730 0         0 my ( $self, $new, $old ) = @_;
731              
732 0 0 0     0 if ( ( !$old ) && $new ) {
    0 0        
733             # We are setting up AnyEvent
734 0         0 require AnyEvent;
735              
736 0 0       0 if ( defined( $self->_subprocs() ) ) {
737 0         0 foreach my $pid ( keys %{ $self->_subprocs() } ) {
  0         0  
738 0         0 $self->_add_anyevent_watcher($pid);
739             }
740             }
741              
742 0         0 $self->_cv( AnyEvent->condvar );
743              
744             } elsif ( $old && ( !$new ) ) {
745             # We are tearing down AnyEvent
746              
747 0 0       0 if ( defined( $self->_subprocs() ) ) {
748 0         0 foreach my $pid ( keys %{ $self->_subprocs() } ) {
  0         0  
749 0         0 my $proc = $self->_subprocs()->{$pid};
750              
751 0         0 $proc->{watcher} = undef;
752             }
753             }
754              
755 0         0 $self->_cv(undef);
756             }
757 0         0 return;
758             }
759              
760             # Sets up the listener for AnyEvent
761             sub _add_anyevent_watcher {
762 0 0   0   0 if ( $#_ != 1 ) { confess 'invalid call' }
  0         0  
763 0         0 my ( $self, $pid ) = @_;
764              
765 0         0 my $proc = $self->_subprocs()->{$pid};
766              
767             $proc->{watcher} = AnyEvent->io(
768             fh => $proc->{fh},
769             poll => 'r',
770             cb => sub {
771 0     0   0 $self->_wait($pid);
772 0 0       0 if ( scalar( keys %{ $self->_subprocs() } ) == 0 ) {
  0         0  
773 0         0 my $oldcv = $self->_cv;
774 0         0 $self->_cv( AnyEvent->condvar );
775 0         0 $oldcv->send();
776             }
777              
778             # Start queued children, if needed
779 0         0 $self->_start_queued_children();
780             },
781 0         0 );
782              
783 0         0 return;
784             }
785              
786             # Used to clear all sub-processes, etc, in child process.
787             sub _clear_all {
788 218 50   218   6061 if ( $#_ != 0 ) { confess 'invalid call' }
  0         0  
789 218         2821 my ( $self ) = @_;
790              
791 218         8089 $self->_cv(undef);
792 218         4716 $self->_last_error(undef);
793 218         4288 $self->_ordered_count(0);
794 218         4912 $self->_ordered_responses( [] );
795 218         4236 $self->_count(1);
796 218         3298 $self->_queued_children( [] );
797              
798 218         1059 do {
799             # Don't warn on AnyEvent in child threads being DESTROYed
800 218     0   15969 local $SIG{__WARN__} = sub { };
801 218         5004 $self->_subprocs( {} );
802             };
803              
804 218         1863 return;
805             }
806              
807              
808             sub start {
809 5 50   5 1 23813 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
810 5         18 my $self = shift;
811 5         10 my $sub = shift;
812              
813             # Test $sub to make sure it is a code ref or a sub ref
814 5 50       41 if ( !_codelike($sub) ) {
815 0         0 confess("Parameter to start() is not a code (or codelike) reference");
816             }
817              
818 5         57 my $pid = fork();
819              
820 5 100       10710 if ( !$pid ) {
821             # We are in the child process.
822 2         183 $sub->();
823 2         1024 exit();
824             }
825              
826 3         126 return;
827             }
828              
829             # Tests to see if something is codelike
830             #
831             # Borrowed from Params::Util (written by Adam Kennedy)
832             sub _codelike {
833 6557 50   6557   25096 if ( scalar(@_) != 1 ) { confess 'invalid call' }
  0         0  
834 6557         14799 my $thing = shift;
835              
836 6557 100       44348 if ( reftype($thing) ) { return 1; }
  6543         24002  
837 14 50 33     56 if ( blessed($thing) && overload::Method( $thing, '()' ) ) { return 1; }
  0         0  
838              
839 14         49 return;
840             }
841              
842             # Destructor emits warning if sub processes are running
843             sub DESTROY {
844 284     284   37650 my $self = shift;
845              
846 284 100       2324 if ( scalar( keys %{ $self->_subprocs } ) ) {
  284         1532  
847 1         101 warn "Warning: Subprocesses running when Parallel::WorkUnit object destroyed\n";
848             }
849              
850 284         54841 return;
851             }
852              
853             1;
854              
855             __END__