File Coverage

blib/lib/Parallel/Async/Task.pm
Criterion Covered Total %
statement 105 120 87.5
branch 19 34 55.8
condition 2 4 50.0
subroutine 28 33 84.8
pod 8 9 88.8
total 162 200 81.0


line stmt bran cond sub pod time code
1             package Parallel::Async::Task;
2 41     41   768 use 5.008005;
  41         119  
  41         1655  
3 41     41   246 use strict;
  41         67  
  41         1292  
4 41     41   217 use warnings;
  41         67  
  41         1284  
5              
6 41     41   452821 use Try::Tiny;
  41         67104  
  41         2135  
7 41     41   39580 use Storable ();
  41         137667  
  41         1650  
8 41     41   294 use File::Spec;
  41         69  
  41         1007  
9 41     41   38482 use POSIX ":sys_wait_h";
  41         291975  
  41         330  
10 41     41   94879 use Time::HiRes ();
  41         67671  
  41         4095  
11              
12             (my $TMPDIR_BASENAME = __PACKAGE__) =~ s!::!-!g;
13              
14             our $WANTARRAY;
15             our $EXIT_CODE;
16              
17             our $WAIT_INTERVAL = 0.1 * 1000 * 1000;
18              
19 41     41   1001934 use Class::Accessor::Lite ro => [qw/parent_pid child_pid/];
  41         46188  
  41         290  
20 41     41   419329 use Parallel::Async::Chain;
  41         78  
  41         47141  
21              
22             sub new {
23 173     173 1 876 my ($class, %args) = @_;
24 173         2188 return bless +{
25             %args,
26             parent_pid => $$,
27             clild_pid => undef,
28             already_run_fg => 0,
29             } => $class;
30             }
31              
32             sub recv :method {
33 70     70 1 23074 my ($self, @args) = @_;
34              
35 70         219 local $WANTARRAY = wantarray;
36              
37 70         373 $self->run(@args);
38 47         1580 $self->_wait();
39              
40 47         1264 my $ret = $self->read_child_result();
41 47 100       1373 return $WANTARRAY ? @$ret : $ret->[0];
42             }
43              
44             sub as_anyevent_child {
45 0     0 1 0 my ($self, $cb, @args) = @_;
46              
47 0         0 local $WANTARRAY = 1;
48              
49 0         0 $self->run(@args);
50              
51 0         0 require AnyEvent;
52             return AnyEvent->child(
53             pid => $self->{child_pid},
54             cb => sub {
55 0     0   0 my ($pid, $status) = @_;
56              
57 0         0 my $ret = $self->read_child_result();
58 0 0       0 return $cb->($pid, $status, $WANTARRAY ? @$ret : $ret->[0]);
59             }
60 0         0 );
61             }
62              
63             sub run {
64 234     234 1 562 my ($self, @args) = @_;
65 234 100       1109 die 'this task already run.' if $self->{already_run_fg};
66              
67 226         326905 my $pid = fork;
68 226 50       9486 die $! unless defined $pid;
69              
70 226         2836 $self->{already_run_fg} = 1;
71 226 100       10245 if ($pid == 0) {## child
72 37         5554 $self->{child_pid} = $$;
73 37         2969 return $self->_run_on_child(@args);
74             }
75             else {## parent
76 189         2900 $self->{child_pid} = $pid;
77 189         10672 return $self->_run_on_parent(@args);
78             }
79             }
80              
81             sub daemonize {
82 3     3 1 9 my ($self, @args) = @_;
83              
84 3         6 my $orig = $self->{code};
85             local $self->{code} = sub {
86 2     2   4251 my $pid = fork;
87 2 50       277 die $! unless defined $pid;
88              
89 2 100       118 if ($pid == 0) {## child
90 1         74 $orig->(@args);
91 1         1004049 exit 0;
92             }
93             else {## parent
94 1         35 return $pid;
95             }
96 3         33 };
97              
98 3         84 return $self->recv();
99             }
100              
101             sub _run_on_parent {
102 189     189   654 my $self = shift;
103 189         7875 return $self->{child_pid};
104             }
105              
106             sub _run_on_child {
107 37     37   831 my ($self, @args) = @_;
108              
109 37         1167 local $EXIT_CODE = 0;
110              
111 37         937 my $orig = $self->{code};
112             my $ret = try {
113 37     37   9554 my @ret;
114              
115             # context proxy
116 37 100       941 if ($WANTARRAY) {
    100          
117 21         316 @ret = $orig->(@args);
118             }
119             elsif (defined $WANTARRAY) {
120 12         249 $ret[0] = $orig->(@args);
121             }
122             else {
123 4         104 $orig->(@args);
124             }
125              
126 34         3360747 return [0, undef, \@ret];
127             }
128             catch {
129 0 0   0   0 $EXIT_CODE = 255 if !$EXIT_CODE; # last resort
130 0 0       0 $EXIT_CODE = $? >> 8 if $? >> 8; # child exit status
131 0 0       0 $EXIT_CODE = $! if $!; # errno
132 0         0 return [1, $_, undef];
133 37         7743 };
134              
135 34         4136 $self->_write_storable_data($ret);
136              
137 34         1240750 CORE::exit($EXIT_CODE);
138             }
139              
140             sub join :method {
141 23     23 1 210 my $self = shift;
142 23         1048 return Parallel::Async::Chain->join($self, @_);
143             }
144              
145             sub _wait {
146 47     47   133 my $self = shift;
147              
148 47         626 my $pid = $self->{parent_pid};
149 47         1448 while ($self->{child_pid} != $pid) {
150 345         6079 $pid = waitpid(-1, WNOHANG);
151 345         34612824 Time::HiRes::usleep($WAIT_INTERVAL);
152 345 50       7458 last if $pid == -1;
153             }
154             }
155              
156             sub _gen_storable_tempfile_path {
157 162     162   736 my $self = shift;
158 162         768365 return File::Spec->catfile(File::Spec->tmpdir, join('-', $TMPDIR_BASENAME, $self->{parent_pid}, $self->{child_pid}) . '.txt');
159             }
160              
161             sub _write_storable_data {
162 34     34   373 my ($self, $data) = @_;
163              
164 34         553 my $storable_tempfile = $self->_gen_storable_tempfile_path();
165             try {
166 34 50   34   3051 Storable::store($data, $storable_tempfile) or die 'faild store.';
167             }
168             catch {
169 0     0   0 warn(qq|The storable module was unable to store the child's data structure to the temp file "$storable_tempfile": | . join(', ', $_));
170 34         1394 };
171             }
172              
173             sub _read_storable_data {
174 128     128   1199 my $self = shift;
175              
176 128         250 my $data;
177              
178 128         577 my $storable_tempfile = $self->_gen_storable_tempfile_path();
179 128 50       9292 if (-e $storable_tempfile) {
180             try {
181 128 50   128   13219 $data = Storable::retrieve($storable_tempfile) or die 'faild retrieve.';
182             }
183             catch {
184 0     0   0 warn(qq|The storable module was unable to retrieve the child's data structure from the temporary file "$storable_tempfile": | . join(', ', $_));
185 128         4756 };
186              
187             # clean up after ourselves
188 128         4374219 unlink $storable_tempfile;
189             }
190              
191 128         1149 return $data;
192             }
193              
194             sub read_child_result {
195 128     128 0 393 my $self = shift;
196 128   50     807 my $data = $self->_read_storable_data() || [];
197              
198 128 50       1059 if ($data->[0]) {## has error
199 0         0 die $data->[1];
200             }
201             else {
202 128   50     1803 return $data->[2] || [];
203             }
204             }
205              
206             sub reset :method {
207 83     83 1 14406 my $self = shift;
208              
209 83         218 $self->{child_pid} = undef;
210 83         198 $self->{already_run_fg} = 0;
211              
212 83         355 return $self;
213             }
214              
215             sub clone {
216 77     77 1 23924 my $self = shift;
217 77         265 my $class = ref $self;
218 77         1034 return $class->new(%$self);
219             }
220              
221             1;
222             __END__