File Coverage

blib/lib/Parallel/Batch.pm
Criterion Covered Total %
statement 45 50 90.0
branch 11 20 55.0
condition n/a
subroutine 7 7 100.0
pod 1 1 100.0
total 64 78 82.0


line stmt bran cond sub pod time code
1             package Parallel::Batch;
2              
3 11     11   249106 use 5.008;
  11         33  
  11         385  
4 11     11   66 use strict;
  11         11  
  11         352  
5 11     11   143 use warnings;
  11         55  
  11         561  
6              
7 11     11   9944 use POSIX qw/:sys_wait_h/;
  11         83050  
  11         77  
8 11     11   23474 use parent qw/Class::Accessor::Fast/;
  11         3267  
  11         55  
9             Parallel::Batch->mk_accessors(qw/jobs code maxprocs progress_cb/);
10              
11             our $VERSION = '0.04';
12              
13              
14             sub run {
15 11     11 1 451 my $self = shift;
16              
17 11         33 my $procs = 0;
18 11         11 my @jobs = @{$self->jobs};
  11         77  
19 11         143 my %inprogress;
20             # progress start(count)
21 11         55 $self->_send_progress(start => scalar @jobs);
22              
23 11         99 while (@jobs) {
24 65 50       283 if ($procs < $self->maxprocs) {
25 65         583 my $job = shift @jobs;
26 65         71730 my $pid = fork();
27 65 50       2941 die unless defined $pid;
28 65 100       648 if ($pid == 0) {
29 10         2343 $self->code->($job);
30 10         35764 exit;
31             }
32 55 50       1564 if ($pid > 0) {
33 55         415 $procs++;
34             # progress new child
35 55         2469 $inprogress{$pid} = $job;
36 55         1590 $self->_send_progress('new', $job);
37             }
38             }
39 55 100       766 if ($procs == $self->maxprocs) {
40 28 50       2381766 if ((my $pid = wait()) > 0) {
41 28         89 $procs--;
42             # progress child finished
43 28         1563 $self->_send_progress('finish', delete $inprogress{$pid});
44             }
45             }
46 55 50       1231 if ((my $pid = waitpid(-1, WNOHANG)) > 0) {
47 0         0 $procs--;
48 0         0 $self->_send_progress('finish', delete $inprogress{$pid});
49             }
50             }
51 1         52 while ($procs > 0) {
52 3 50       213814 if ((my $pid = wait()) > 0) {
53 3         14 $procs--;
54 3         112 $self->_send_progress('finish', delete $inprogress{$pid});
55             }
56             }
57             # progress done
58 1         29 $self->_send_progress('done');
59             }
60              
61             sub _send_progress {
62 98     98   853 my ($self, $type, $arg) = @_;
63              
64 98 50       1632 return unless $self->progress_cb;
65 0           my $cb = $self->progress_cb->{$type};
66 0 0         if (defined $cb)
67             {
68 0           $cb->($arg);
69             }
70             }
71              
72             1;
73             __END__