File Coverage

blib/lib/Mojo/IOLoop/ForkCall.pm
Criterion Covered Total %
statement 72 93 77.4
branch 20 30 66.6
condition 7 9 77.7
subroutine 16 19 84.2
pod 2 2 100.0
total 117 153 76.4


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ForkCall;
2              
3 10     10   1948248 use Mojo::Base 'Mojo::EventEmitter';
  10         67  
  10         42  
4              
5             our $VERSION = '0.19';
6             $VERSION = eval $VERSION;
7              
8 10     10   5979 use Mojo::IOLoop;
  10         342718  
  10         51  
9 10     10   4821 use IO::Pipely 'pipely';
  10         21425  
  10         479  
10 10     10   61 use POSIX ();
  10         22  
  10         169  
11 10     10   45 use Scalar::Util ();
  10         19  
  10         161  
12              
13 10     10   3853 use Perl::OSType 'is_os_type';
  10         3675  
  10         555  
14 10     10   67 use constant IS_WINDOWS => is_os_type('Windows');
  10         17  
  10         33  
15 10     10   678 use constant IS_CYGWIN => $^O eq 'cygwin';
  10         19  
  10         451  
16              
17 10     10   59 use Exporter 'import';
  10         27  
  10         9844  
18             our @EXPORT_OK = qw/fork_call/;
19              
20             has 'ioloop' => sub { Mojo::IOLoop->singleton };
21             has 'serializer' => sub { require Storable; \&Storable::freeze };
22             has 'deserializer' => sub { require Storable; \&Storable::thaw };
23             has 'weaken' => 0;
24              
25             sub run {
26 22     22 1 885731 my ($self, @args) = @_;
27 22     22   126 my $delay = $self->ioloop->delay(sub{ $self->_run(@args) });
  22         3391  
28 22     0   3282 $delay->catch(sub{ $self->emit( error => pop ) });
  0         0  
29 22         860 return $self;
30             }
31              
32             sub _run {
33 22     22   52 my ($self, $job) = (shift, shift);
34 22         37 my ($args, $cb);
35 22 100 100     170 $args = shift if @_ and ref $_[0] eq 'ARRAY';
36 22 100       80 $cb = shift if @_;
37              
38 22         217 my ($r, $w) = pipely;
39              
40 22         26957 my $child = fork;
41 22 50       925 die "Failed to fork: $!" unless defined $child;
42              
43 22 50       713 if ($child == 0) {
44             # child
45              
46             # cleanup running loops
47 0         0 $self->ioloop->reset;
48 0         0 delete $self->{ioloop}; # not sure this is needed
49 0         0 Mojo::IOLoop->reset;
50 0         0 close $r;
51              
52 0         0 my $serializer = $self->serializer;
53              
54 0         0 local $@;
55 0         0 my $res = eval {
56 0         0 local $SIG{__DIE__};
57 0         0 $serializer->([undef, $job->(@$args)]);
58             };
59 0 0       0 $res = $serializer->([$@]) if $@;
60              
61 0         0 _send($w, $res);
62              
63             # attempt to generalize exiting from child cleanly on all platforms
64             # adapted from POE::Wheel::Run mostly
65 0         0 eval { POSIX::_exit(0) } unless IS_WINDOWS;
  0         0  
66 0         0 eval { CORE::kill KILL => $$ };
  0         0  
67 0         0 exit 0;
68              
69             } else {
70             # parent
71 22         685 close $w;
72 22         529 my $parent = $$;
73 22         1685 $self->emit( spawn => $child );
74              
75 22         1711 my $stream = Mojo::IOLoop::Stream->new($r)->timeout(0);
76 22         4271 $self->ioloop->stream($stream);
77              
78 22         7509 my $buffer = '';
79 22     29   609 $stream->on( read => sub { $buffer .= $_[1] } );
  29         4034346  
80              
81 22 100       472 Scalar::Util::weaken($self) if $self->weaken;
82              
83 22 0   0   697 $stream->on( error => sub { $self->emit( error => $_[1] ) if $self } );
  0         0  
84              
85 22         353 my $deserializer = $self->deserializer;
86             $stream->on( close => sub {
87 22 50   22   23702 return unless $$ == $parent; # not my stream!
88 22         77 local $@;
89              
90             # clean up the zombie. It won't block, it's already dead.
91 22         432 waitpid $child, 0;
92              
93             # attempt to deserialize, emit error and return early
94 22         120 my $res = eval { $deserializer->($buffer) };
  22         232  
95 22 100       2187 if ($@) {
96 2 50       52 $self->emit( error => $@ ) if $self;
97 2         29 return;
98             }
99              
100             # call the callback, emit error if it fails
101 20 100       73 eval { $self->$cb(@$res) if $cb };
  20         181  
102 20 100 66     5813 $self->emit( error => $@ ) if $@ and $self;
103              
104             # emit the finish event, emit error if IT fails
105 19 100       85 eval { $self->emit( finish => @$res ) if $self };
  19         218  
106 19 100 66     456 $self->emit( error => $@ ) if $@ and $self;
107              
108 22         472 });
109             }
110             }
111              
112             ## functions
113              
114             sub fork_call (&@) {
115 3     3 1 3163 my $job = shift;
116 3         12 my $cb = pop;
117             return __PACKAGE__->new->run($job, \@_, sub {
118             # local $_ = shift; #TODO think about this
119 3     3   14 shift;
120 3         7 local $@ = shift;
121 3         12 $cb->(@_);
122 3         48 });
123             }
124              
125             sub _send {
126 0     0     my ($h, $data) = @_;
127 0           if (IS_WINDOWS || IS_CYGWIN) {
128             my $len = length $data;
129             my $written = 0;
130             while ($written < $len) {
131             my $count = syswrite $h, $data, 65536, $written;
132             unless (defined $count) { warn $!; last }
133             $written += $count;
134             }
135             } else {
136 0 0         warn $! unless defined syswrite $h, $data;
137             }
138             }
139              
140             1;
141              
142              
143             __END__