File Coverage

blib/lib/Parallel/Queue.pm
Criterion Covered Total %
statement 96 110 87.2
branch 60 90 66.6
condition 10 14 71.4
subroutine 11 12 91.6
pod 0 6 0.0
total 177 232 76.2


line stmt bran cond sub pod time code
1             ########################################################################
2             # housekeeping
3             ########################################################################
4              
5             package Parallel::Queue;
6 90     90   1510006 use v5.12;
  90         276  
  90         2967  
7              
8 90     90   373 use strict;
  90         95  
  90         2771  
9              
10 90     90   371 use Carp qw( croak );
  90         176  
  90         4674  
11 90     90   371 use Scalar::Util qw( blessed looks_like_number );
  90         178  
  90         7240  
12 90     90   4381 use Symbol qw( qualify_to_ref );
  90         6231  
  90         83903  
13              
14             ########################################################################
15             # package variables
16             ########################################################################
17              
18             our $VERSION = '3.6';
19             $VERSION = eval $VERSION;
20              
21             # defaults.
22              
23             my $fork = '';
24             my $verbose = '';
25             my $finish = '';
26              
27             ########################################################################
28             # execution handlers
29             ########################################################################
30              
31             sub next_job
32             {
33 3505     3505 0 6051 state $next = '';
34 3505         17165 my $job = '';
35              
36             JOB:
37 3505         92856 for(;;)
38             {
39 3517 100       10480 if( $next )
40             {
41 48 100       102 $job = $next->( $_[0] )
42             and last;
43              
44 6 50       36 say STDERR "Completed iterator: '$_[0]'"
45             if $verbose;
46              
47 6         10 $next = '';
48 6         452 shift;
49             }
50              
51 3475 100       9651 @_ or last;
52              
53 3464         16870 for my $item ( $_[0] )
54             {
55 3464 100 66     34073 if
    100          
    50          
56             (
57             blessed $item
58             and
59             $next = $item->can( 'next_job' )
60             )
61             {
62 6 50       35 say STDERR "New iterator: '$next' ($_[0])"
63             if $verbose;
64              
65             next JOB
66 6         15 }
67             elsif( ref $item )
68             {
69 3452         28077 $job = shift;
70             }
71             elsif( $item )
72             {
73             # these might end up being processing directives
74             # later on, for now ignore them.
75              
76 6         576 say STDERR "Discarding non-queue '$item'";
77              
78             shift
79 6         22 }
80             else
81             {
82             # silently ignore empty filler.
83              
84 0         0 shift;
85             }
86             }
87              
88 3458 100       14478 last if $job;
89             }
90              
91 3505 100       20399 say STDERR "Next job: '$job'"
92             if $verbose;
93              
94 3505 100       21702 $job
95             or return
96             }
97              
98             sub run_nofork
99             {
100             # discard the count, iterate the queue without forking.
101              
102 13     13 0 17 shift;
103              
104 13 100       463 say STDERR "Non-forking queue"
105             if $verbose;
106              
107 13         38 while( my $sub = &next_job )
108             {
109             # these should all exit zero.
110              
111 174         212 my $result = eval { $sub->() };
  174         374  
112              
113 174 100       35071 say STDERR "Complete: '$result' ($@)"
114             if $verbose;
115              
116 174 100       1338 if( $result )
    50          
117             {
118 2         332 say STDERR "Non-zero exit: $result, aborting queue";
119              
120             last
121 2         7 }
122             elsif( $@ )
123             {
124 0         0 say STDERR "Error in job: $@";
125              
126             last
127 0         0 }
128             }
129              
130             return
131 13         434 }
132              
133             sub fork_job
134             {
135             @_
136             or do
137 3320 50   3320 0 12202 {
138 0 0       0 say STDERR "Queue empty."
139             if $verbose;
140              
141             return
142 0         0 };
143              
144 3320 50       16347 my $job = &next_job
145             or return;
146              
147 3320 100       1822937 if( ( my $pid = fork ) > 0 )
    50          
148             {
149 3240 50       23028 say STDERR "fork: $pid"
150             if $verbose;
151              
152             # nothing useful to hand back.
153              
154             return
155 3240         496860514 }
156             elsif( defined $pid )
157             {
158             # child passes the exit status of the perl sub call
159             # to the caller as our exit status. the O/S will deal
160             # with signal values.
161             #
162             # aside: failing to exit here will cause runaway
163             # phorkatosis.
164              
165 80 50       2017 say STDERR "\tExecuting: '$job'"
166             if $verbose;
167              
168 80         1757 my $exitval = eval { $job->() };
  80         4226  
169              
170 80 50       210681 $@
171             ? die $@
172             : exit $exitval
173             }
174             else
175             {
176             # pass back the fork failure for the caller to deal with.
177              
178 0         0 die "Phorkafobia: $!";
179             }
180             };
181              
182             sub fork_queue
183             {
184             # count was validated in runqueue.
185              
186 204     204 0 326 my $count = shift;
187              
188             # what's left on the stack are the jobs to run.
189             # which may be none.
190             # if so, we're done.
191              
192 204 50       612 say STDERR "Forking initial $count jobs"
193             if $verbose;
194              
195 204         795 &fork_job for 1 .. $count;
196              
197 194         36721100 while( (my $pid = wait) > 0 )
198             {
199 3132 50       20655 say STDERR "Complete: $pid ($?)"
200             if $verbose;
201              
202             # this assumes normal *NIX 16-bit exit values,
203             # with a status in the high byte and signum
204             # in the lower. notice that $status is not
205             # masked to 8 bits, however. this allows us to
206             # deal with non-zero exits on > 16-bit systems.
207             #
208             # caller can trap the signals.
209              
210 3132 50       22800 if( $? )
211             {
212             # bad news, boss...
213              
214             my $message
215             = do
216 0         0 {
217 0 0       0 if( my $exit = $? >> 8 )
    0          
218             {
219 0         0 "exit( $exit ) by $pid"
220             }
221             elsif( my $signal = $? & 0xFF )
222             {
223 0         0 "kill SIG-$signal on $pid"
224             }
225             };
226              
227 0 0       0 $finish
228             ? warn $message
229             : die $message
230             ;
231             }
232              
233             # kick off another job if the queue is not empty.
234              
235 3132 100       2114311 @_ and &fork_job;
236             }
237              
238             return
239 124         1236 };
240              
241             # debug or zero count run the jobs without forking,
242             # simplifies most debugging issues.
243              
244             sub runqueue
245             {
246 217     217 0 316528 my ( $count ) = @_;
247              
248 217 50       1267 looks_like_number $count
249             or croak "Bogus runqueue: '$count' non-numeric";
250              
251 217 50       743 $count < 0
252             and croak "Bogus runqueue: negative count ($count)";
253              
254             $fork && $count
255 204         408 ? eval { &fork_queue }
256 217 100 100     1633 : eval { &run_nofork }
  13         37  
257             ;
258              
259             # return the unused portion.
260             # this includes any incomplete iterators.
261              
262             @_
263 137         2162 }
264              
265             sub import
266             {
267             # discard the current package name and deal
268             # with the args. empty arg for 'export'
269             # indicates that runqueue needs to be exported.
270              
271 90     90   470 my $caller = caller;
272              
273 90         176 shift;
274              
275 90 100       278 @_ or unshift @_, qw( export );
276              
277 90         95 my $export = 1;
278 90         92 my $subname = 'runqueue';
279              
280 90         817 $fork = ! $^P;
281 90         93 $verbose = '';
282 90         91 $finish = '';
283              
284 90         183 for my $arg ( @_ )
285             {
286 90         194 my( $name, $value ) = split /=/, $arg;
287              
288 90   100     536 $value //= 1;
289              
290 90 100       282 $value = ! $value
291             if $name =~ s/^no//;
292              
293 90 100       285 if( 'fork' eq $name )
    50          
    100          
    100          
    50          
294             {
295 85         173 $fork = $value;
296             }
297             elsif( 'verbose' eq $name )
298             {
299 0         0 $verbose = $value;
300             }
301             elsif( 'finish' eq $name )
302             {
303 1         2 $finish = $value;
304             }
305             elsif( 'debug' eq $name )
306             {
307 1 50       2 if( $value )
308             {
309 1         2 $fork = '';
310 1         2 $verbose = 1;
311             }
312             }
313             elsif( 'export' eq $name )
314             {
315 3         5 $export = !! $value;
316              
317 3 100       16 looks_like_number $value
318             or $subname = $value;
319             }
320             else
321             {
322 0         0 warn "Unknown argument: '$arg' ignored";
323             }
324             }
325              
326 90 50 66     606 if( $fork && $^P && ! $DB::fork_TTY )
      33        
327             {
328 83         12163 say STDERR
329             'Debugger forking without $DB::fork_TTY; expect problems';
330             }
331              
332 90 50       349 if( $export )
333             {
334 90         352 my $ref = qualify_to_ref $subname, $caller;
335              
336 90         1794 undef &{ *$ref };
  90         453  
337              
338 90         356 *$ref = \&runqueue
339             }
340              
341             return
342 90         156812 }
343              
344             sub configure
345             {
346 0 0   0 0   @_ and import @_, qw( noexport );
347             }
348              
349             # keep require happy
350              
351             1
352              
353             __END__