File Coverage

blib/lib/Mojo/IOLoop/Subprocess.pm
Criterion Covered Total %
statement 21 74 28.3
branch 0 16 0.0
condition 0 10 0.0
subroutine 7 18 38.8
pod 5 5 100.0
total 33 123 26.8


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::Subprocess;
2 63     63   481 use Mojo::Base 'Mojo::EventEmitter';
  63         158  
  63         417  
3              
4 63     63   427 use Config;
  63         1282  
  63         2940  
5 63     63   417 use Mojo::IOLoop;
  63         194  
  63         617  
6 63     63   377 use Mojo::IOLoop::Stream;
  63         159  
  63         485  
7 63     63   27927 use Mojo::JSON;
  63         213  
  63         3171  
8 63     63   30179 use Mojo::Promise;
  63         236  
  63         460  
9 63     63   462 use POSIX ();
  63         144  
  63         71504  
10              
11             has deserialize => sub { \&Mojo::JSON::decode_json };
12             has ioloop => sub { Mojo::IOLoop->singleton }, weak => 1;
13             has serialize => sub { \&Mojo::JSON::encode_json };
14              
15 0     0 1   sub exit_code { shift->{exit_code} }
16              
17 0     0 1   sub pid { shift->{pid} }
18              
19             sub run {
20 0     0 1   my ($self, @args) = @_;
21 0     0     $self->ioloop->next_tick(sub { $self->_start(@args) });
  0            
22 0           return $self;
23             }
24              
25             sub run_p {
26 0     0 1   my ($self, $child) = @_;
27              
28 0           my $p = Mojo::Promise->new;
29             my $parent = sub {
30 0     0     my ($self, $err) = (shift, shift);
31 0 0         $err ? $p->reject($err) : $p->resolve(@_);
32 0           };
33 0     0     $self->ioloop->next_tick(sub { $self->_start($child, $parent) });
  0            
34              
35 0           return $p;
36             }
37              
38             sub _start {
39 0     0     my ($self, $child, $parent) = @_;
40              
41             # No fork emulation support
42 0 0         return $self->$parent('Subprocesses do not support fork emulation') if $Config{d_pseudofork};
43              
44             # Pipe for subprocess communication
45 0 0         return $self->$parent("Can't create pipe: $!") unless pipe(my $reader, $self->{writer});
46 0           $self->{writer}->autoflush(1);
47              
48             # Child
49 0 0         return $self->$parent("Can't fork: $!") unless defined(my $pid = $self->{pid} = fork);
50 0 0         unless ($pid) {
51 0 0         eval {
52 0           $self->ioloop->reset({freeze => 1});
53 0   0       my $results = eval { [$self->$child] } // [];
  0            
54 0           print {$self->{writer}} '0-', $self->serialize->([$@, @$results]);
  0            
55 0           $self->emit('cleanup');
56             } or warn $@;
57 0           POSIX::_exit(0);
58             }
59              
60             # Parent
61 0           my $me = $$;
62 0           close $self->{writer};
63 0           my $stream = Mojo::IOLoop::Stream->new($reader)->timeout(0);
64 0           $self->emit('spawn')->ioloop->stream($stream);
65 0           my $buffer = '';
66             $stream->on(
67             read => sub {
68 0     0     $buffer .= pop;
69 0           while (1) {
70 0           my ($len) = $buffer =~ /^([0-9]+)\-/;
71 0 0 0       last unless $len and length $buffer >= $len + $+[0];
72 0           my $snippet = substr $buffer, 0, $len + $+[0], '';
73 0           my $args = $self->deserialize->(substr $snippet, $+[0]);
74 0           $self->emit(progress => @$args);
75             }
76             }
77 0           );
78             $stream->on(
79             close => sub {
80 0 0   0     return unless $$ == $me;
81 0           waitpid $pid, 0;
82 0           $self->{exit_code} = $? >> 8;
83 0           substr $buffer, 0, 2, '';
84 0   0       my $results = eval { $self->deserialize->($buffer) } // [];
  0            
85 0   0       $self->$parent(shift(@$results) // $@, @$results);
86             }
87 0           );
88             }
89              
90             sub progress {
91 0     0 1   my ($self, @args) = @_;
92 0           my $serialized = $self->serialize->(\@args);
93 0           print {$self->{writer}} length($serialized), '-', $serialized;
  0            
94             }
95              
96             1;
97              
98             =encoding utf8
99              
100             =head1 NAME
101              
102             Mojo::IOLoop::Subprocess - Subprocesses
103              
104             =head1 SYNOPSIS
105              
106             use Mojo::IOLoop::Subprocess;
107              
108             # Operation that would block the event loop for 5 seconds
109             my $subprocess = Mojo::IOLoop::Subprocess->new;
110             $subprocess->run(
111             sub ($subprocess) {
112             sleep 5;
113             return '♥', 'Mojolicious';
114             },
115             sub ($subprocess, $err, @results) {
116             say "Subprocess error: $err" and return if $err;
117             say "I $results[0] $results[1]!";
118             }
119             );
120              
121             # Operation that would block the event loop for 5 seconds (with promise)
122             $subprocess->run_p(sub {
123             sleep 5;
124             return '♥', 'Mojolicious';
125             })->then(sub (@results) {
126             say "I $results[0] $results[1]!";
127             })->catch(sub {
128             my $err = shift;
129             say "Subprocess error: $err";
130             });
131              
132             # Start event loop if necessary
133             $subprocess->ioloop->start unless $subprocess->ioloop->is_running;
134              
135             =head1 DESCRIPTION
136              
137             L allows L to perform computationally expensive operations in subprocesses,
138             without blocking the event loop.
139              
140             =head1 EVENTS
141              
142             L inherits all events from L and can emit the following new ones.
143              
144             =head2 cleanup
145              
146             $subprocess->on(cleanup => sub ($subprocess) {...});
147              
148             Emitted in the subprocess right before the process will exit.
149              
150             $subprocess->on(cleanup => sub ($subprocess) { say "Process $$ is about to exit" });
151              
152             =head2 progress
153              
154             $subprocess->on(progress => sub ($subprocess, @data) {...});
155              
156             Emitted in the parent process when the subprocess calls the L method.
157              
158             =head2 spawn
159              
160             $subprocess->on(spawn => sub ($subprocess) {...});
161              
162             Emitted in the parent process when the subprocess has been spawned.
163              
164             $subprocess->on(spawn => sub ($subprocess) {
165             my $pid = $subprocess->pid;
166             say "Performing work in process $pid";
167             });
168              
169             =head1 ATTRIBUTES
170              
171             L implements the following attributes.
172              
173             =head2 deserialize
174              
175             my $cb = $subprocess->deserialize;
176             $subprocess = $subprocess->deserialize(sub {...});
177              
178             A callback used to deserialize subprocess return values, defaults to using L.
179              
180             $subprocess->deserialize(sub ($bytes) { return [] });
181              
182             =head2 ioloop
183              
184             my $loop = $subprocess->ioloop;
185             $subprocess = $subprocess->ioloop(Mojo::IOLoop->new);
186              
187             Event loop object to control, defaults to the global L singleton. Note that this attribute is weakened.
188              
189             =head2 serialize
190              
191             my $cb = $subprocess->serialize;
192             $subprocess = $subprocess->serialize(sub {...});
193              
194             A callback used to serialize subprocess return values, defaults to using L.
195              
196             $subprocess->serialize(sub ($array) { return '' });
197              
198             =head1 METHODS
199              
200             L inherits all methods from L and implements the following new ones.
201              
202             =head2 exit_code
203              
204             my $code = $subprocess->exit_code;
205              
206             Returns the subprocess exit code, or C if the subprocess is still running.
207              
208             =head2 pid
209              
210             my $pid = $subprocess->pid;
211              
212             Process id of the spawned subprocess if available.
213              
214             =head2 progress
215              
216             $subprocess->progress(@data);
217              
218             Send data serialized with L to the parent process at any time during the subprocess's execution. Must be
219             called by the subprocess and emits the L event in the parent process with the data.
220              
221             # Send progress information to the parent process
222             $subprocess->run(
223             sub ($subprocess) {
224             $subprocess->progress('0%');
225             sleep 5;
226             $subprocess->progress('50%');
227             sleep 5;
228             return 'Hello Mojo!';
229             },
230             sub ($subprocess, $err, @results) {
231             say 'Progress is 100%';
232             say $results[0];
233             }
234             );
235             $subprocess->on(progress => sub ($subprocess, @data) { say "Progress is $data[0]" });
236              
237             =head2 run
238              
239             $subprocess = $subprocess->run(sub {...}, sub {...});
240              
241             Execute the first callback in a child process and wait for it to return one or more values, without blocking
242             L in the parent process. Then execute the second callback in the parent process with the results. The return
243             values of the first callback and exceptions thrown by it, will be serialized with L, so they can be shared
244             between processes.
245              
246             =head2 run_p
247              
248             my $promise = $subprocess->run_p(sub {...});
249              
250             Same as L, but returns a L object instead of accepting a second callback.
251              
252             $subprocess->run_p(sub {
253             sleep 5;
254             return '♥', 'Mojolicious';
255             })->then(sub (@results) {
256             say "I $results[0] $results[1]!";
257             })->catch(sub ($err) {
258             say "Subprocess error: $err";
259             })->wait;
260              
261             =head1 SEE ALSO
262              
263             L, L, L.
264              
265             =cut