File Coverage

blib/lib/Mojo/IOLoop/ForkCall.pm
Criterion Covered Total %
statement 76 97 78.3
branch 20 30 66.6
condition 7 9 77.7
subroutine 17 20 85.0
pod 2 2 100.0
total 122 158 77.2


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ForkCall;
2              
3 10     10   3063964 use Mojo::Base 'Mojo::EventEmitter';
  10         87  
  10         63  
4              
5             our $VERSION = '0.21';
6             $VERSION = eval $VERSION;
7              
8 10     10   7727 use Mojo::IOLoop;
  10         466623  
  10         61  
9 10     10   4654 use Mojo::IOLoop::Delay;
  10         8506  
  10         86  
10 10     10   5820 use IO::Pipely 'pipely';
  10         24872  
  10         665  
11 10     10   92 use POSIX ();
  10         24  
  10         211  
12 10     10   60 use Scalar::Util ();
  10         24  
  10         243  
13              
14 10     10   5243 use Perl::OSType 'is_os_type';
  10         4533  
  10         697  
15 10     10   76 use constant IS_WINDOWS => is_os_type('Windows');
  10         21  
  10         51  
16 10     10   936 use constant IS_CYGWIN => $^O eq 'cygwin';
  10         92  
  10         542  
17              
18 10     10   123 use Exporter 'import';
  10         25  
  10         13043  
19             our @EXPORT_OK = qw/fork_call/;
20              
21             has 'ioloop' => sub { Mojo::IOLoop->singleton };
22             has 'serializer' => sub { require Storable; \&Storable::freeze };
23             has 'deserializer' => sub { require Storable; \&Storable::thaw };
24             has 'weaken' => 0;
25              
26             sub run {
27 22     22 1 1107454 my ($self, @args) = @_;
28 22         290 my $delay = Mojo::IOLoop::Delay->new->ioloop($self->ioloop);
29 22     22   710 $delay->steps(sub{ $self->_run(@args) });
  22         4000  
30 22     0   2858 $delay->catch(sub{ $self->emit( error => pop ) });
  0         0  
31 22         1550 return $self;
32             }
33              
34             sub _run {
35 22     22   90 my ($self, $job) = (shift, shift);
36 22         51 my ($args, $cb);
37 22 100 100     250 $args = shift if @_ and ref $_[0] eq 'ARRAY';
38 22 100       91 $cb = shift if @_;
39              
40 22         220 my ($r, $w) = pipely;
41              
42 22         38308 my $child = fork;
43 22 50       1507 die "Failed to fork: $!" unless defined $child;
44              
45 22 50       1096 if ($child == 0) {
46             # child
47              
48             # cleanup running loops
49 0         0 $self->ioloop->reset;
50 0         0 delete $self->{ioloop}; # not sure this is needed
51 0         0 Mojo::IOLoop->reset;
52 0         0 close $r;
53              
54 0         0 my $serializer = $self->serializer;
55              
56 0         0 local $@;
57 0         0 my $res = eval {
58 0         0 local $SIG{__DIE__};
59 0         0 $serializer->([undef, $job->(@$args)]);
60             };
61 0 0       0 $res = $serializer->([$@]) if $@;
62              
63 0         0 _send($w, $res);
64              
65             # attempt to generalize exiting from child cleanly on all platforms
66             # adapted from POE::Wheel::Run mostly
67 0         0 eval { POSIX::_exit(0) } unless IS_WINDOWS;
  0         0  
68 0         0 eval { CORE::kill KILL => $$ };
  0         0  
69 0         0 exit 0;
70              
71             } else {
72             # parent
73 22         1059 close $w;
74 22         851 my $parent = $$;
75 22         2348 $self->emit( spawn => $child );
76              
77 22         2454 my $stream = Mojo::IOLoop::Stream->new($r)->timeout(0);
78 22         9803 $self->ioloop->stream($stream);
79              
80 22         9951 my $buffer = '';
81 22     29   966 $stream->on( read => sub { $buffer .= $_[1] } );
  29         4046963  
82              
83 22 100       723 Scalar::Util::weaken($self) if $self->weaken;
84              
85 22 0   0   898 $stream->on( error => sub { $self->emit( error => $_[1] ) if $self } );
  0         0  
86              
87 22         619 my $deserializer = $self->deserializer;
88             $stream->on( close => sub {
89 22 50   22   41327 return unless $$ == $parent; # not my stream!
90 22         94 local $@;
91              
92             # clean up the zombie. It won't block, it's already dead.
93 22         697 waitpid $child, 0;
94              
95             # attempt to deserialize, emit error and return early
96 22         98 my $res = eval { $deserializer->($buffer) };
  22         256  
97 22 100       2928 if ($@) {
98 2 50       60 $self->emit( error => $@ ) if $self;
99 2         44 return;
100             }
101              
102             # call the callback, emit error if it fails
103 20 100       115 eval { $self->$cb(@$res) if $cb };
  20         233  
104 20 100 66     5510 $self->emit( error => $@ ) if $@ and $self;
105              
106             # emit the finish event, emit error if IT fails
107 19 100       96 eval { $self->emit( finish => @$res ) if $self };
  19         325  
108 19 100 66     505 $self->emit( error => $@ ) if $@ and $self;
109              
110 22         722 });
111             }
112             }
113              
114             ## functions
115              
116             sub fork_call (&@) {
117 3     3 1 4599 my $job = shift;
118 3         9 my $cb = pop;
119             return __PACKAGE__->new->run($job, \@_, sub {
120             # local $_ = shift; #TODO think about this
121 3     3   19 shift;
122 3         7 local $@ = shift;
123 3         14 $cb->(@_);
124 3         48 });
125             }
126              
127             sub _send {
128 0     0     my ($h, $data) = @_;
129 0           if (IS_WINDOWS || IS_CYGWIN) {
130             my $len = length $data;
131             my $written = 0;
132             while ($written < $len) {
133             my $count = syswrite $h, $data, 65536, $written;
134             unless (defined $count) { warn $!; last }
135             $written += $count;
136             }
137             } else {
138 0 0         warn $! unless defined syswrite $h, $data;
139             }
140             }
141              
142             1;
143              
144              
145             __END__