File Coverage

blib/lib/Mojo/IOLoop/ForkCall.pm
Criterion Covered Total %
statement 72 93 77.4
branch 20 30 66.6
condition 7 9 77.7
subroutine 16 19 84.2
pod 2 2 100.0
total 117 153 76.4


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ForkCall;
2              
3 10     10   2322549 use Mojo::Base 'Mojo::EventEmitter';
  10         76  
  10         59  
4              
5             our $VERSION = '0.20';
6             $VERSION = eval $VERSION;
7              
8 10     10   7657 use Mojo::IOLoop;
  10         428376  
  10         61  
9 10     10   5889 use IO::Pipely 'pipely';
  10         24693  
  10         652  
10 10     10   75 use POSIX ();
  10         24  
  10         151  
11 10     10   51 use Scalar::Util ();
  10         31  
  10         195  
12              
13 10     10   4761 use Perl::OSType 'is_os_type';
  10         4429  
  10         728  
14 10     10   80 use constant IS_WINDOWS => is_os_type('Windows');
  10         30  
  10         43  
15 10     10   936 use constant IS_CYGWIN => $^O eq 'cygwin';
  10         24  
  10         557  
16              
17 10     10   67 use Exporter 'import';
  10         25  
  10         11710  
18             our @EXPORT_OK = qw/fork_call/;
19              
20             has 'ioloop' => sub { Mojo::IOLoop->singleton };
21             has 'serializer' => sub { require Storable; \&Storable::freeze };
22             has 'deserializer' => sub { require Storable; \&Storable::thaw };
23             has 'weaken' => 0;
24              
25             sub run {
26 22     22 1 937642 my ($self, @args) = @_;
27 22     22   201 my $delay = $self->ioloop->delay(sub{ $self->_run(@args) });
  22         3724  
28 22     0   3943 $delay->catch(sub{ $self->emit( error => pop ) });
  0         0  
29 22         1187 return $self;
30             }
31              
32             sub _run {
33 22     22   68 my ($self, $job) = (shift, shift);
34 22         58 my ($args, $cb);
35 22 100 100     250 $args = shift if @_ and ref $_[0] eq 'ARRAY';
36 22 100       84 $cb = shift if @_;
37              
38 22         197 my ($r, $w) = pipely;
39              
40 22         40386 my $child = fork;
41 22 50       1202 die "Failed to fork: $!" unless defined $child;
42              
43 22 50       982 if ($child == 0) {
44             # child
45              
46             # cleanup running loops
47 0         0 $self->ioloop->reset;
48 0         0 delete $self->{ioloop}; # not sure this is needed
49 0         0 Mojo::IOLoop->reset;
50 0         0 close $r;
51              
52 0         0 my $serializer = $self->serializer;
53              
54 0         0 local $@;
55 0         0 my $res = eval {
56 0         0 local $SIG{__DIE__};
57 0         0 $serializer->([undef, $job->(@$args)]);
58             };
59 0 0       0 $res = $serializer->([$@]) if $@;
60              
61 0         0 _send($w, $res);
62              
63             # attempt to generalize exiting from child cleanly on all platforms
64             # adapted from POE::Wheel::Run mostly
65 0         0 eval { POSIX::_exit(0) } unless IS_WINDOWS;
  0         0  
66 0         0 eval { CORE::kill KILL => $$ };
  0         0  
67 0         0 exit 0;
68              
69             } else {
70             # parent
71 22         916 close $w;
72 22         690 my $parent = $$;
73 22         2141 $self->emit( spawn => $child );
74              
75 22         2272 my $stream = Mojo::IOLoop::Stream->new($r)->timeout(0);
76 22         8868 $self->ioloop->stream($stream);
77              
78 22         11804 my $buffer = '';
79 22     29   677 $stream->on( read => sub { $buffer .= $_[1] } );
  29         4042299  
80              
81 22 100       559 Scalar::Util::weaken($self) if $self->weaken;
82              
83 22 0   0   953 $stream->on( error => sub { $self->emit( error => $_[1] ) if $self } );
  0         0  
84              
85 22         450 my $deserializer = $self->deserializer;
86             $stream->on( close => sub {
87 22 50   22   28649 return unless $$ == $parent; # not my stream!
88 22         78 local $@;
89              
90             # clean up the zombie. It won't block, it's already dead.
91 22         843 waitpid $child, 0;
92              
93             # attempt to deserialize, emit error and return early
94 22         121 my $res = eval { $deserializer->($buffer) };
  22         330  
95 22 100       2707 if ($@) {
96 2 50       110 $self->emit( error => $@ ) if $self;
97 2         31 return;
98             }
99              
100             # call the callback, emit error if it fails
101 20 100       90 eval { $self->$cb(@$res) if $cb };
  20         262  
102 20 100 66     5754 $self->emit( error => $@ ) if $@ and $self;
103              
104             # emit the finish event, emit error if IT fails
105 19 100       113 eval { $self->emit( finish => @$res ) if $self };
  19         237  
106 19 100 66     521 $self->emit( error => $@ ) if $@ and $self;
107              
108 22         561 });
109             }
110             }
111              
112             ## functions
113              
114             sub fork_call (&@) {
115 3     3 1 5031 my $job = shift;
116 3         12 my $cb = pop;
117             return __PACKAGE__->new->run($job, \@_, sub {
118             # local $_ = shift; #TODO think about this
119 3     3   7 shift;
120 3         10 local $@ = shift;
121 3         22 $cb->(@_);
122 3         34 });
123             }
124              
125             sub _send {
126 0     0     my ($h, $data) = @_;
127 0           if (IS_WINDOWS || IS_CYGWIN) {
128             my $len = length $data;
129             my $written = 0;
130             while ($written < $len) {
131             my $count = syswrite $h, $data, 65536, $written;
132             unless (defined $count) { warn $!; last }
133             $written += $count;
134             }
135             } else {
136 0 0         warn $! unless defined syswrite $h, $data;
137             }
138             }
139              
140             1;
141              
142              
143             __END__