File Coverage

blib/lib/Parallel/Async/Chain.pm
Criterion Covered Total %
statement 62 66 93.9
branch 5 6 83.3
condition n/a
subroutine 16 17 94.1
pod 6 7 85.7
total 89 96 92.7


line stmt bran cond sub pod time code
1             package Parallel::Async::Chain;
2 41     41   1026 use 5.008005;
  41         127  
  41         1505  
3 41     41   213 use strict;
  41         67  
  41         1057  
4 41     41   199 use warnings;
  41         84  
  41         4085  
5              
6 41     41   236 use Class::Accessor::Lite rw => [qw/tasks/];
  41         75  
  41         273  
7 41     41   2636 use POSIX ":sys_wait_h";
  41         82  
  41         250  
8 41     41   7258 use Time::HiRes ();
  41         115  
  41         4479  
9              
10             sub join :method {
11 63     63 1 210 my $self = shift;
12 63 100       419 $self = bless +{ tasks => [] } => $self unless ref $self;
13              
14 63         127 push @{ $self->{tasks} } => @_;
  63         326  
15              
16 63         196 return $self;
17             }
18              
19             sub recv :method {
20 44     44 1 11832 my ($self, @args) = @_;
21              
22 41     41   223 no warnings 'once';
  41         58  
  41         2215  
23 44         167 local $Parallel::Async::Task::WANTARRAY = 1;
24 41     41   194 use warnings 'once';
  41         62  
  41         10579  
25              
26 44         244 my @pids = $self->run(@args);
27 26         955 $self->_wait(@pids);
28              
29 26         602 return $self->read_child_result();
30             }
31              
32             sub run {
33 61     61 1 240 my ($self, @args) = @_;
34 61         131 return map { $_->run(@args) } @{ $self->{tasks} };
  166         1753  
  61         293  
35             }
36              
37             sub daemonize {
38 0     0 1 0 my ($self, @args) = @_;
39 0         0 return map { $_->daemonize(@args) } @{ $self->{tasks} };
  0         0  
  0         0  
40             }
41              
42             sub _wait {
43 26     26   297 my $self = shift;
44 26         500 my %pids = map { $_ => 1 } @_;
  81         1533  
45              
46 26         693 while (%pids) {
47 320         22256 my $pid = waitpid(-1, WNOHANG);
48 320 50       1775 last if $pid == -1;
49              
50 320 100       3629 delete $pids{$pid} if exists $pids{$pid};
51             }
52             continue {
53 41     41   195 no warnings 'once';
  41         81  
  41         8301  
54 320         32221354 Time::HiRes::usleep($Parallel::Async::Task::WAIT_INTERVAL);
55             }
56             }
57              
58             sub read_child_result {
59 27     27 0 287 my $self = shift;
60 27         282 return map { $_->read_child_result() } @{ $self->{tasks} };
  82         973  
  27         350  
61             }
62              
63             sub reset :method {
64 24     24 1 97128 my $self = shift;
65 24         95 $_->reset for @{ $self->{tasks} };
  24         233  
66 24         246 return $self;
67             }
68              
69             sub clone {
70 17     17 1 8119569 my $self = shift;
71 17         101 my $class = ref $self;
72 17         37 return $class->join(map { $_->clone } @{ $self->{tasks} });
  51         304  
  17         74  
73             }
74              
75             1;
76             __END__