File Coverage

lib/Mojo/IOLoop/ReadWriteProcess.pm
Criterion Covered Total %
statement 360 379 94.9
branch 190 230 82.6
condition 57 103 55.3
subroutine 70 70 100.0
pod 26 28 92.8
total 703 810 86.7


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess;
2              
3             our $VERSION = '0.33';
4              
5 39     39   11142742 use Mojo::Base 'Mojo::EventEmitter';
  39         390566  
  39         319  
  29         427  
  30         154  
  30         194  
6 39     39   66845 use Mojo::File 'path';
  39         61085  
  39         1987  
  30         201  
  2         16  
  2         13  
7 39     39   236 use Mojo::Util qw(b64_decode b64_encode scope_guard);
  39         88  
  39         3284  
  2         11  
  30         240  
  30         204  
8 39     39   18356 use Mojo::IOLoop::Stream;
  39         4830603  
  39         325  
  30         354  
  30         3822  
  30         11283  
9              
10 39     39   18349 use Mojo::IOLoop::ReadWriteProcess::Exception;
  39         135  
  39         394  
  2         42  
  1         16  
  0            
11 39     39   16387 use Mojo::IOLoop::ReadWriteProcess::Pool;
  39         121  
  39         1997  
  0            
  0            
  0            
12 39     39   14911 use Mojo::IOLoop::ReadWriteProcess::Queue;
  39         120  
  39         281  
  0            
  0            
  0            
13 39     39   1766 use Mojo::IOLoop::ReadWriteProcess::Session;
  39         67  
  39         1488  
  0            
  0            
  0            
14              
15 39     39   16228 use Mojo::IOLoop::ReadWriteProcess::Shared::Lock;
  39         112  
  39         2551  
  0            
16 39     39   16910 use Mojo::IOLoop::ReadWriteProcess::Shared::Memory;
  39         119  
  39         1998  
17 39     39   283 use Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore;
  39         104  
  39         1763  
18              
19 39     39   272 use B::Deparse;
  39         782  
  39         1341  
20 39     39   269 use Carp 'confess';
  39         94  
  39         1812  
21 39     39   244 use IO::Handle;
  39         56  
  39         2052  
22 39     39   19878 use IO::Pipe;
  39         46496  
  39         1360  
23 39     39   16411 use IO::Select;
  39         58342  
  39         1893  
24 39     39   19700 use IPC::Open3;
  39         104137  
  39         2330  
25 39     39   303 use Time::HiRes 'sleep';
  39         88  
  39         383  
26 39     39   4395 use Symbol 'gensym';
  39         84  
  39         1588  
27 39     39   26279 use Storable;
  39         123697  
  39         2489  
28 39     39   292 use POSIX qw( :sys_wait_h :signal_h );
  39         88  
  39         283  
29             our @EXPORT_OK
30             = (qw(parallel batch process pool queue), qw(shared_memory lock semaphore));
31 39     39   14149 use Exporter 'import';
  39         80  
  39         1603  
32              
33 39     39   202 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  39         91  
  39         221971  
34              
35             has [
36             qw(kill_sleeptime sleeptime_during_kill),
37             qw(separate_err autoflush set_pipes verbose),
38             qw(internal_pipes channels)
39             ] => 1;
40              
41             has [qw(blocking_stop serialize quirkiness total_sleeptime_during_kill)] => 0;
42              
43             has [
44             qw(execute code process_id pidfile return_status),
45             qw(channel_in channel_out write_stream read_stream error_stream),
46             qw(_internal_err _internal_return _status args)
47             ];
48              
49             has max_kill_attempts => 5;
50             has kill_whole_group => 0;
51              
52             has error => sub { Mojo::Collection->new };
53              
54             has ioloop => sub { Mojo::IOLoop->singleton };
55             has session => sub { Mojo::IOLoop::ReadWriteProcess::Session->singleton };
56              
57             has _deparse => sub { B::Deparse->new }
58             if DEBUG;
59             has _deserialize => sub { \&Storable::thaw };
60             has _serialize => sub { \&Storable::freeze };
61             has _default_kill_signal => POSIX::SIGTERM;
62             has _default_blocking_signal => POSIX::SIGKILL;
63              
64             # Override new() just to support sugar syntax
65             # so it is possible to do : process->new(sub{ print "Hello World\n" })->start->stop; and so on.
66             sub new {
67 377 100   377 1 76454 push(@_, code => splice @_, 1, 1) if ref $_[1] eq "CODE";
    100     1    
68 377         4144 return shift->SUPER::new(@_);
69             }
70              
71             sub to_ioloop {
72 27     27 1 40901 my $self = shift;
73 27 100       893 confess 'Pipes needs to be set!' unless $self->read_stream;
    100          
74 2         16 my $stream = Mojo::IOLoop::Stream->new($self->read_stream)->timeout(0);
75 2         615 $self->ioloop->stream($stream);
76 2         473 my $me = $$;
77             $stream->on(
78             close => sub {
79 2 50   2   467 return unless $$ == $me;
    100          
80 2 50       13 $self->_collect->stop unless defined $self->_status;
    100          
81 2         29 });
82 2         12 return $stream;
83             }
84              
85 177     177 1 159032 sub process { __PACKAGE__->new(@_) }
86 6     6 1 6564 sub batch { Mojo::IOLoop::ReadWriteProcess::Pool->new(@_) }
87 6     6 1 15455 sub queue { Mojo::IOLoop::ReadWriteProcess::Queue->new(@_) }
88 1     1 1 15 sub lock { Mojo::IOLoop::ReadWriteProcess::Shared::Lock->new(@_) }
89 1     1 0 55 sub semaphore { Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore->new(@_) }
90 1     1 0 2 sub shared_memory { Mojo::IOLoop::ReadWriteProcess::Shared::Memory->new(@_) }
91              
92             sub parallel {
93 3     3 1 1367 my $c = batch();
94 3         55 $c->add(@_) for 1 .. +pop();
95 3         22 return $c;
96             }
97              
98             sub _diag {
99 44     44   3787 my ($self, @messages) = @_;
100 44         343 my $caller = (caller(1))[3];
101 44 50       3339 print STDERR ">> ${caller}(): @messages\n" if (DEBUG || $self->verbose);
    50          
102             }
103              
104             sub _open_collect_status {
105 25     25   2294 my ($self, $pid, $e, $errno) = @_;
106              
107 25 50       336 return unless $self;
    50          
108              
109 25 50 33     188 $self->_status($e // $?) unless defined $self->_status;
    0 100        
110 24         680 $self->_diag("Forked code Process Exit status: " . $self->exit_status)
111             if DEBUG;
112              
113 24         188 $self->_clean_pidfile;
114              
115 24         255 return $self;
116             }
117              
118             # Use open3 to launch external program.
119             sub _open {
120 42     42   517 my ($self, @args) = @_;
121 42         107 $self->_diag('Execute: ' . (join ', ', map { "'$_'" } @args)) if DEBUG;
122              
123 43         372 $self->on(collect_status => \&_open_collect_status);
124              
125 42         629 my ($wtr, $rdr, $err);
126 42         370 $err = gensym;
127 42 100       944 my $pid = open3($wtr, $rdr, ($self->separate_err) ? $err : undef, @args);
128              
129 36 100       130896 die "Cannot create pipe: $!" unless defined $pid;
130 34         27261 $self->process_id($pid);
131              
132             # Defered collect of return status and removal of pidfile
133              
134 34 100       841 return $self unless $self->set_pipes();
135              
136 32         1350 $self->read_stream(IO::Handle->new_from_fd($rdr, "r"));
137 32         6874 $self->write_stream(IO::Handle->new_from_fd($wtr, "w"));
138 32 100       4513 $self->error_stream(($self->separate_err)
139             ? IO::Handle->new_from_fd($err, "r")
140             : $self->write_stream);
141              
142 32         3414 return $self;
143             }
144              
145 453 100   460   2395 sub _clean_pidfile { unlink(shift->pidfile) if $_[0]->pidfile }
146              
147             sub _collect {
148 63     52   252 my ($self, $pid) = @_;
149 12   33     58 $pid //= $self->pid;
      66        
150              
151 12         105 $self->session->consume_collected_info;
152             $self->session->_protect(
153             sub {
154 12     1   135 local $?;
155 12 100       270 waitpid $pid, 0 unless defined $self->_status;
156 12 100       46595 return $self->_open_collect_status($pid) if $self->execute;
157 12 100       409 return $self->_fork_collect_status($pid) if $self->code;
158 12         168 });
159              
160 12         263 $self;
161             }
162              
163             sub _fork_collect_status {
164 238     228   12432 my ($self, $pid, $e, $errno) = @_;
165              
166 238 100       3401 return unless $self;
167              
168 237         2682 my $return_reader;
169             my $internal_err_reader;
170 237         839 my $rt;
171 227         70 my @result_error;
172              
173 227 50 66     1529 $self->_status($e // $?) unless defined $self->_status;
      100        
174 227         3991 $self->_diag("Forked code Process Exit status: " . $self->exit_status)
175             if DEBUG;
176              
177 227 100       1222 if ($self->_internal_return) {
178 174 50       1377 $return_reader
179             = $self->_internal_return->isa("IO::Pipe::End")
180             ? $self->_internal_return
181             : $self->_internal_return->reader();
182 174 50 0     22673 $self->_new_err('Cannot read from return code pipe') && return
      33        
183             unless IO::Select->new($return_reader)->can_read(10);
184 174         6156309 $rt = $return_reader->getline();
185 174         10132128 $self->_diag("Forked code Process Returns: " . ($rt ? $rt : 'nothing'))
186             if DEBUG;
187             $self->return_status(
188 174 100       1287 $self->serialize ? eval { $self->_deserialize->(b64_decode($rt)) }
  2 100       92  
189             : $rt ? $rt
190             : ());
191             }
192 227 100       5360 if ($self->_internal_err) {
193 175 100       1393 $internal_err_reader
194             = $self->_internal_err->isa("IO::Pipe::End")
195             ? $self->_internal_err
196             : $self->_internal_err->reader();
197 175 100 50     20255 $self->_new_err('Cannot read from errors code pipe') && return
      66        
198             unless IO::Select->new($internal_err_reader)->can_read(10);
199 173         29513 @result_error = $internal_err_reader->getlines();
200             push(
201 58         615 @{$self->error},
202 173 100       12822 map { Mojo::IOLoop::ReadWriteProcess::Exception->new($_) } @result_error
  58         3079  
203             ) if @result_error;
204 173         455 $self->_diag("Forked code Process Errors: " . join("\n", @result_error))
205             if DEBUG;
206             }
207              
208 225         1610 $self->_clean_pidfile;
209 225         1782 return $self;
210             }
211              
212             # Handle forking of code
213             sub _fork {
214 225     226   2480 my ($self, $code, @args) = @_;
215 225 100       1119 die "Can't spawn child without code" unless ref($code) eq "CODE";
216              
217             # STDIN/STDOUT/STDERR redirect.
218 223         743 my ($input_pipe, $output_pipe, $output_err_pipe);
219              
220             # Separated handles that could be used for internal comunication.
221 223         0 my ($channel_in, $channel_out);
222              
223              
224 223 100       754 if ($self->set_pipes) {
225 174 100       2435 $input_pipe = IO::Pipe->new()
226             or $self->_new_err('Failed creating input pipe');
227 174 100       26331 $output_pipe = IO::Pipe->new()
228             or $self->_new_err('Failed creating output pipe');
229 174 100       18647 $output_err_pipe = IO::Pipe->new()
230             or $self->_new_err('Failed creating output error pipe');
231 172 100       16440 if ($self->channels) {
232 172 100       3928 $channel_in = IO::Pipe->new()
233             or $self->_new_err('Failed creating Channel input pipe');
234 172 100       21857 $channel_out = IO::Pipe->new()
235             or $self->_new_err('Failed creating Channel output pipe');
236             }
237             }
238 221 100       17987 if ($self->internal_pipes) {
239 219 100       3618 my $internal_err = IO::Pipe->new()
240             or $self->_new_err('Failed creating internal error pipe');
241 204 100       24211 my $internal_return = IO::Pipe->new()
242             or $self->_new_err('Failed creating internal return pipe');
243              
244             # Internal pipes to retrieve error/return
245 221         21008 $self->_internal_err($internal_err);
246 219         2239 $self->_internal_return($internal_return);
247             }
248              
249             # Defered collect of return status
250              
251 221         3983 $self->on(collect_status => \&_fork_collect_status);
252              
253 221         5343 $self->_diag("Fork: " . $self->_deparse->coderef2text($code)) if DEBUG;
254              
255 221         309229 my $pid = fork;
256 208 100       6947 die "Cannot fork: $!" unless defined $pid;
257              
258 208 100       2894 if ($pid == 0) {
259 35         2674 local $SIG{CHLD};
260 37     19   3053 local $SIG{TERM} = sub { $self->emit('SIG_TERM')->_exit(1) };
  17         126  
261              
262 20         587 my $return;
263             my $internal_err;
264              
265 20 50       1310 if ($self->internal_pipes) {
266 20 100       1429 if ($self->_internal_err) {
267 19 50       725 $internal_err
268             = $self->_internal_err->isa("IO::Pipe::End")
269             ? $self->_internal_err
270             : $self->_internal_err->writer();
271 19         5517 $internal_err->autoflush(1);
272             }
273              
274 20 100       3935 if ($self->_internal_return) {
275 19 50       617 $return
276             = $self->_internal_return->isa("IO::Pipe::End")
277             ? $self->_internal_return
278             : $self->_internal_return->writer();
279 19         2071 $return->autoflush(1);
280             }
281             else {
282 1         24 eval { $internal_err->write("Can't setup return status pipe") };
  1         23  
283             }
284             }
285              
286             # Set pipes to redirect STDIN/STDOUT/STDERR + channels if desired
287 20 100       1320 if ($self->set_pipes()) {
288 19         568 my $stdout;
289             my $stderr;
290 19         0 my $stdin;
291              
292 19 100       410 $stdout = $output_pipe->writer() if $output_pipe;
293 19 100       1485 $stderr
    100          
294             = (!$self->separate_err) ? $stdout
295             : $output_err_pipe ? $output_err_pipe->writer()
296             : undef;
297 19 100       2373 $stdin = $input_pipe->reader() if $input_pipe;
298 19 50 66     3552 open STDERR, ">&", $stderr
      0        
299             or !!$internal_err->write($!)
300             or $self->_diag($!);
301 18 50 33     862 open STDOUT, ">&", $stdout
302             or !!$internal_err->write($!)
303             or $self->_diag($!);
304 18 50 33     627 open STDIN, ">&", $stdin
305             or !!$internal_err->write($!)
306             or $self->_diag($!);
307              
308 18         341 $self->read_stream($stdin);
309 18         386 $self->error_stream($stderr);
310 18         390 $self->write_stream($stdout);
311 18 50       447 if ($self->channels) {
312              
313 18 50       502 $self->channel_in($channel_in->reader) if $channel_in;
314 18 50       1624 $self->channel_out($channel_out->writer) if $channel_out;
315 36         1308 eval { $self->$_->autoflush($self->autoflush) }
316 18         1234 for qw( channel_in channel_out );
317             }
318 54         1506 eval { $self->$_->autoflush($self->autoflush) }
319 18         856 for qw(read_stream error_stream write_stream );
320             }
321 19         975 $self->session->reset;
322 19         6974 $self->session->subreaper(0); # Subreaper bit does not persist in fork
323 19         3211 $self->process_id($$);
324 19         930 $! = 0;
325 19         164 my $rt;
326 19         130 eval { $rt = [$code->($self, @args)]; };
  19         203  
327 0 0       0 if ($internal_err) {
328 0 0       0 $internal_err->write($@) if $@;
329 0 0 0     0 $internal_err->write($!) if !$@ && $!;
330             }
331 0 0 0     0 $rt = @$rt[0]
      0        
332             if !$self->serialize && ref $rt eq 'ARRAY' && scalar @$rt == 1;
333 0 0 0     0 $rt = b64_encode(eval { $self->_serialize->($rt) })
  0         0  
334             if $self->serialize && $return;
335 0 0       0 $return->write($rt) if $return;
336 0   0     0 $self->_exit($@ // $!);
337             }
338 186         13840 $self->process_id($pid);
339              
340 186 100       9807 return $self unless $self->set_pipes();
341              
342 138 100       8174 $self->read_stream($output_pipe->reader) if $output_pipe;
343 138 100       39956 $self->error_stream((!$self->separate_err) ? $self->read_stream()
    100          
344             : $output_err_pipe ? $output_err_pipe->reader()
345             : undef);
346 138 100       17270 $self->write_stream($input_pipe->writer) if $input_pipe;
347              
348 138 50       13449 if ($self->set_pipes) {
349 138 50       2269 if ($self->channels) {
350 138 100       2797 $self->channel_in($channel_in->writer) if $channel_in;
351 138 100       13613 $self->channel_out($channel_out->reader) if $channel_out;
352 276         32039 eval { $self->$_->autoflush($self->autoflush) }
353 138         12253 for qw( channel_in channel_out );
354             }
355 414         12390 eval { $self->$_->autoflush($self->autoflush) }
356 138         6578 for qw(read_stream error_stream write_stream );
357             }
358              
359 138         8397 return $self;
360             }
361              
362             sub _new_err {
363 43     26   477 my $self = shift;
364 43         272 my $err = Mojo::IOLoop::ReadWriteProcess::Exception->new(@_);
365 42         124 push(@{$self->error}, $err);
  42         92  
366              
367             # XXX: Need to switch, we should emit one error at the time, and _shutdown
368             # should emit just the ones wasn't emitted
369 42         418 return $self->emit(process_error => [$err]);
370             }
371              
372             sub _exit {
373 16   0 1   262 my $code = shift // 0;
374 16         2612 eval { POSIX::_exit($code); };
  16         1673  
375 16         1703 exit($code);
376             }
377              
378             sub wait {
379 124     109 1 6871 my $self = shift;
380 124         2316 sleep $self->sleeptime_during_kill while ($self->is_running);
381 125         3682 return $self;
382             }
383              
384 108     94 1 9695 sub wait_stop { shift->wait->stop }
385 41 100   19 1 14814 sub errored { !!@{shift->error} ? 1 : 0 }
  31         1733  
386              
387             # PPC64: Treat msb on neg (different cpu/perl interpreter version)
388 21 100   6   4600 sub _st { my $st = shift >> 8; ($st & 0x80) ? (0x100 - ($st & 0xFF)) : $st }
  23         445  
389              
390             sub exit_status {
391 53 100 66 36 1 19276 defined $_[0]->_status && $_[0]->quirkiness ? _st(shift->_status)
    100          
392             : defined $_[0]->_status ? shift->_status >> 8
393             : undef;
394             }
395              
396             sub restart {
397 46 100   29 1 128792 $_[0]->is_running ? $_[0]->stop->start : $_[0]->start;
398             }
399              
400             sub is_running {
401 1451     1434 1 67223923 my ($self) = shift;
402 1446         6056 $self->session->consume_collected_info;
403 1429 100       13111 $self->process_id ? kill 0 => $self->process_id : 0;
404             }
405              
406             sub write_pidfile {
407 260     260 1 2413 my ($self, $pidfile) = @_;
408 260 100       1184 $self->pidfile($pidfile) if $pidfile;
409 260 100       1645 return unless $self->pid;
410 223 100       2885 return unless $self->pidfile;
411              
412 4         160 path($self->pidfile)->spurt($self->pid);
413 4         1510 return $self;
414             }
415              
416             # Convenience functions
417             sub _syswrite {
418 40     40   317 my $stream = shift;
419 40 100       153 return unless $stream;
420 40         461 $stream->syswrite($_ . "\n") for @_;
421             }
422              
423             sub _getline {
424 113 100   83   1975 return unless IO::Select->new($_[0])->can_read(10);
425 78         51760 shift->getline;
426             }
427              
428             sub _getlines {
429 15 100   6   945 return unless IO::Select->new($_[0])->can_read(10);
430 6 100       1109 wantarray ? shift->getlines : join '', @{[shift->getlines]};
  4         144  
431             }
432              
433             # Write to the controlled-process STDIN
434             sub write_stdin {
435 21     21 1 9938 my ($self, @data) = @_;
436 21         1708 _syswrite($self->write_stream, @data);
437 21         1010 return $self;
438             }
439              
440             sub write_channel {
441 1     1 1 36 my ($self, @data) = @_;
442 1         22 _syswrite($self->channel_in, @data);
443 1         30 return $self;
444             }
445              
446             # Get all lines from the current process output stream
447 6     6 1 3398 sub read_all_stdout { _getlines(shift->read_stream) }
448              
449             # Get all lines from the process channel
450 7     6 1 1865 sub read_all_channel { _getlines(shift->channel_out); }
451 41     40 1 9449 sub read_stdout { _getline(shift->read_stream) }
452 21     21 1 6219 sub read_channel { _getline(shift->channel_out) }
453              
454             sub read_all_stderr {
455 3 100   3 1 726 return $_[0]->getline unless $_[0]->separate_err;
456 1         28 _getlines(shift->error_stream);
457             }
458              
459             # Get a line from the current process output stream
460             sub read_stderr {
461 9 50   9 1 14841 return $_[0]->getline unless $_[0]->separate_err;
462 9         104 _getline(shift->error_stream);
463             }
464              
465             sub start {
466 266     266 1 15161 my $self = shift;
467 266 50       1184 return $self if $self->is_running;
468 261 100 100     2341 die "Nothing to do" unless !!$self->execute || !!$self->code;
469              
470             my @args
471             = defined($self->args)
472             ? ref($self->args) eq "ARRAY"
473 258 100       5093 ? @{$self->args}
  6 100       69  
474             : $self->args
475             : ();
476              
477 258 100       3304 $self->session->enable_subreaper if $self->subreaper;
478 258         3471 $self->_status(undef);
479 258         3101 $self->session->enable;
480              
481             {
482 258         1342 my $old_emit_from_sigchld = $self->session->emit_from_sigchld;
  258         911  
483 258         2212 $self->session->emit_from_sigchld(0);
484             my $scope_guard = scope_guard sub {
485 247     239   9025883 $self->session->emit_from_sigchld($old_emit_from_sigchld);
486 247 100       6296 $self->session->consume_collected_info if ($old_emit_from_sigchld);
487 258         7107 };
488              
489 258 100       9763 if ($self->code) {
    50          
490 218         1884 $self->_fork($self->code, @args);
491             }
492             elsif ($self->execute) {
493 40         700 $self->_open($self->execute, @args);
494             }
495              
496 238         7604 $self->write_pidfile;
497 255         10840 $self->emit('start');
498 255         13608 $self->session->register($self->pid() => $self);
499             }
500 254         14072 return $self;
501             }
502              
503             sub send_signal {
504 267     241 1 5353 my $self = shift;
505 267   66     2915 my $signal = shift // $self->_default_kill_signal;
506 267   66     2265 my $pid = shift // $self->process_id;
507 267 100 100     1150 return unless $self->kill_whole_group || $self->is_running;
508 232         4376 $self->_diag("Sending signal '$signal' to $pid") if DEBUG;
509 239         9097 kill $signal => $pid;
510 239         2373 return $self;
511             }
512              
513             sub stop {
514 272     230 1 71505 my $self = shift;
515              
516 256         2445 my $pid = $self->pid;
517 288 100       3791 return $self unless defined $pid;
518 254 100       2865 return $self->_shutdown(1) unless $self->is_running;
519 61         1253 $self->_diag("Stopping $pid") if DEBUG;
520              
521 61         1049 my $ret;
522 61         408 my $attempt = 1;
523 60   33     631 my $timeout = $self->total_sleeptime_during_kill // 0;
524 55         664 my $sleep_time = $self->sleeptime_during_kill;
525 55         339 my $max_attempts = $self->max_kill_attempts;
526 59         1046 my $signal = $self->_default_kill_signal;
527 59 100       715 $pid = -getpgrp($pid) if $self->kill_whole_group;
528 59   66     1409 until ((defined $ret && ($ret == $pid || $ret == -1))
      100        
      100        
      100        
529             || ($attempt > $max_attempts && $timeout <= 0))
530             {
531 235   100     2592 my $send_signal = $attempt == 1 || $timeout <= 0;
532 226         528 $self->_diag(
533             "attempt $attempt/$max_attempts to kill process: $pid, timeout: $timeout")
534             if DEBUG && $send_signal;
535             $self->session->_protect(
536             sub {
537 226     234   1466 local $?;
538 226 100       584 if ($send_signal) {
539 222         835 $self->send_signal($signal, $pid);
540 222         721 ++$attempt;
541             }
542 226         1386 $ret = waitpid($pid, WNOHANG);
543 226 100 66     2343 $self->_status($?) if $ret == $pid || $ret == -1;
544 226         1287 });
545 226 100       1439 if ($sleep_time) {
546 226         2045103 sleep $sleep_time;
547 226         4040 $timeout -= $sleep_time;
548             }
549             }
550 50 100       366 return $self->_shutdown if defined $self->_status;
551              
552 22 50       410 sleep $self->kill_sleeptime if $self->kill_sleeptime;
553              
554 22 100       223107 if ($self->blocking_stop) {
555 16         297 $self->_diag("Could not kill process id: $pid, blocking attempt") if DEBUG;
556 16         193 $self->emit('process_stuck');
557              
558             ### XXX: avoid to protect on blocking.
559 16         351 $self->send_signal($self->_default_blocking_signal, $pid);
560 16         5332 $ret = waitpid($pid, 0);
561 16 100 66     392 $self->_status($?) if $ret == $pid || $ret == -1;
562 16         575 return $self->_shutdown;
563             }
564             else {
565 6         90 $self->_diag("Could not kill process id: $pid") if DEBUG;
566 6         66 $self->_new_err('Could not kill process');
567             }
568 6         156 return $self;
569             }
570              
571             sub _shutdown {
572 208     200   4436 my ($self, $wait) = @_;
573 208 50       766 return $self unless $self->pid;
574              
575 208         1190 $self->_diag("Shutdown " . $self->pid) if DEBUG;
576             $self->session->_protect(
577             sub {
578 56     83   394 local $?;
579 56         348 waitpid $self->pid, 0;
580 56         1091 $self->emit('collect_status');
581 208 100 100     1814 }) if $wait && !defined $self->_status;
582              
583 201 100       1672 $self->emit('collect_status') unless defined $self->_status;
584 201         2081 $self->_clean_pidfile;
585 201 100 100     1761 $self->emit('process_error', $self->error)
586             if $self->error && $self->error->size > 0;
587 201         11026 $self->unsubscribe('collect_status');
588              
589 201         3066 return $self->emit('stop');
590             }
591              
592             # General alias
593             *pid = \&process_id;
594             *died = \&_errored;
595             *failed = \&_errored;
596             *diag = \&_diag;
597             *pool = \&batch;
598             *signal = \&send_signal;
599             *prctl = \&Mojo::IOLoop::ReadWriteProcess::Session::_prctl;
600             *subreaper = \&Mojo::IOLoop::ReadWriteProcess::Session::subreaper;
601              
602             *enable_subreaper = \&Mojo::IOLoop::ReadWriteProcess::Session::enable_subreaper;
603             *disable_subreaper
604             = \&Mojo::IOLoop::ReadWriteProcess::Session::disable_subreaper;
605             *_get_prctl_syscall
606             = \&Mojo::IOLoop::ReadWriteProcess::Session::_get_prctl_syscall;
607              
608             # Aliases - write
609             *write = \&write_stdin;
610             *stdin = \&write_stdin;
611             *channel_write = \&write_channel;
612              
613             # Aliases - read
614             *read = \&read_stdout;
615             *stdout = \&read_stdout;
616             *getline = \&read_stdout;
617             *stderr = \&read_stderr;
618             *err_getline = \&read_stderr;
619             *channel_read = \&read_channel;
620             *read_all = \&read_all_stdout;
621             *getlines = \&read_all_stdout;
622             *stderr_all = \&read_all_stderr;
623             *err_getlines = \&read_all_stderr;
624             *channel_read_all = \&read_all_channel;
625              
626             # Aliases - IO::Handle
627             *stdin_handle = \&write_stream;
628             *stdout_handle = \&read_stream;
629             *stderr_handle = \&error_stream;
630             *channe_write_handle = \&channel_in;
631             *channel_read_handle = \&channel_out;
632              
633             1;
634              
635              
636             =encoding utf-8
637              
638             =head1 NAME
639              
640             Mojo::IOLoop::ReadWriteProcess - Execute external programs or internal code blocks as separate process.
641              
642             =head1 SYNOPSIS
643              
644             use Mojo::IOLoop::ReadWriteProcess;
645              
646             # Code fork
647             my $process = Mojo::IOLoop::ReadWriteProcess->new(sub { print "Hello\n" });
648             $process->start();
649             print "Running\n" if $process->is_running();
650             $process->getline(); # Will return "Hello\n"
651             $process->pid(); # Process id
652             $process->stop();
653             $process->wait_stop(); # if you intend to wait its lifespan
654              
655             # Methods can be chained, thus this is valid:
656             use Mojo::IOLoop::ReadWriteProcess qw(process);
657             my $output = process( sub { print "Hello\n" } )->start()->wait_stop->getline;
658              
659             # Handles seamelessy also external processes:
660             my $process = process(execute=> '/path/to/bin' )->args([qw(foo bar baz)]);
661             $process->start();
662             my $line_output = $process->getline();
663             my $pid = $process->pid();
664             $process->stop();
665             my @errors = $process->error;
666              
667             # Get process return value
668             $process = process( sub { return "256"; } )->start()->wait_stop;
669             # We need to stop it to retrieve the exit status
670             my $return = $process->return_status;
671              
672             # We can access directly to handlers from the object:
673             my $stdout = $process->read_stream;
674             my $stdin = $process->write_stream;
675             my $stderr = $process->error_stream;
676              
677             # So this works:
678             print $stdin "foo bar\n";
679             my @lines = <$stdout>;
680              
681             # There is also an alternative channel of communication (just for forked processes):
682             my $channel_in = $process->channel_in; # write to the child process
683             my $channel_out = $process->channel_out; # read from the child process
684             $process->channel_write("PING"); # convenience function
685              
686             =head1 DESCRIPTION
687              
688             Mojo::IOLoop::ReadWriteProcess is yet another process manager.
689              
690             =head1 EVENTS
691              
692             L inherits all events from L and can emit
693             the following new ones.
694              
695             =head2 start
696              
697             $process->on(start => sub {
698             my ($process) = @_;
699             $process->is_running();
700             });
701              
702             Emitted when the process starts.
703              
704             =head2 stop
705              
706             $process->on(stop => sub {
707             my ($process) = @_;
708             $process->restart();
709             });
710              
711             Emitted when the process stops.
712              
713             =head2 process_error
714              
715             $process->on(process_error => sub {
716             my ($e) = @_;
717             my @errors = @{$e};
718             });
719              
720             Emitted when the process produce errors.
721              
722             =head2 process_stuck
723              
724             $process->on(process_stuck => sub {
725             my ($self) = @_;
726             ...
727             });
728              
729             Emitted when C is set and all attempts for killing the process
730             in C have been exhausted.
731             The event is emitted before attempting to kill it with SIGKILL and becoming blocking.
732              
733             =head2 SIG_CHLD
734              
735             $process->on(SIG_CHLD => sub {
736             my ($self) = @_;
737             ...
738             });
739              
740             Emitted when we receive SIG_CHLD.
741              
742             =head2 SIG_TERM
743              
744             $process->on(SIG_TERM => sub {
745             my ($self) = @_;
746             ...
747             });
748              
749             Emitted when the child forked process receives SIG_TERM, before exiting.
750              
751             =head2 collected
752              
753             $process->on(collected => sub {
754             my ($self) = @_;
755             ...
756             });
757              
758             Emitted right after status collection.
759              
760             =head2 collect_status
761              
762             $process->on(collect_status => sub {
763             my ($self) = @_;
764             ...
765             });
766              
767             Emitted when on child process waitpid.
768             It is used internally to get the child process status.
769             Note: events attached to it are wiped when process has been stopped.
770              
771             =head1 ATTRIBUTES
772              
773             L inherits all attributes from L and implements
774             the following new ones.
775              
776             =head2 execute
777              
778             use Mojo::IOLoop::ReadWriteProcess;
779             my $process = Mojo::IOLoop::ReadWriteProcess->new(execute => "/usr/bin/perl");
780             $process->start();
781             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
782             $process->stop();
783              
784             C should contain the external program that you wish to run.
785              
786             =head2 code
787              
788             use Mojo::IOLoop::ReadWriteProcess;
789             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" } );
790             $process->start();
791             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
792             $process->stop();
793              
794             It represent the code you want to run in background.
795              
796             You do not need to specify C, it is implied if no arguments is given.
797              
798             my $process = Mojo::IOLoop::ReadWriteProcess->new(sub { print "Hello" });
799             $process->start();
800             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
801             $process->stop();
802              
803             =head2 args
804              
805             use Mojo::IOLoop::ReadWriteProcess;
806             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello ".$_[1] }, args => "User" );
807             $process->start();
808             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
809             $process->stop();
810              
811             # The process will print "Hello User"
812              
813             Arguments pass to the external binary or the code block. Use arrayref to pass many.
814              
815             =head2 blocking_stop
816              
817             use Mojo::IOLoop::ReadWriteProcess;
818             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" }, blocking_stop => 1 );
819             $process->start();
820             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
821             $process->stop(); # Will wait indefinitely until the process is stopped
822              
823             Set it to 1 if you want to do blocking stop of the process.
824              
825              
826             =head2 channels
827              
828             use Mojo::IOLoop::ReadWriteProcess;
829             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" }, channels => 0 );
830             $process->start();
831             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
832             $process->stop(); # Will wait indefinitely until the process is stopped
833              
834             Set it to 0 if you want to disable internal channels.
835              
836             =head2 session
837              
838             use Mojo::IOLoop::ReadWriteProcess;
839             my $process = Mojo::IOLoop::ReadWriteProcess->new(sub { print "Hello" });
840             my $session = $process->session;
841             $session->enable_subreaper;
842              
843             Returns the current L singleton.
844              
845             =head2 subreaper
846              
847             use Mojo::IOLoop::ReadWriteProcess;
848             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello ".$_[1] }, args => "User" );
849             $process->subreaper(1)->start();
850             $process->on( stop => sub { shift()->disable_subreaper } );
851             $process->stop();
852              
853             # The process will print "Hello User"
854              
855             Mark the current process (not the child) as subreaper on start.
856             It's on invoker behalf to disable subreaper when process stops, as it marks the current process and not the
857             child.
858              
859             =head2 ioloop
860              
861             my $loop = $process->ioloop;
862             $subprocess = $process->ioloop(Mojo::IOLoop->new);
863              
864             Event loop object to control, defaults to the global L singleton.
865              
866             =head2 max_kill_attempts
867              
868             use Mojo::IOLoop::ReadWriteProcess;
869             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" }, max_kill_attempts => 50 );
870             $process->start();
871             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
872             $process->stop(); # It will attempt to send SIGTERM 50 times.
873              
874             Defaults to C<5>, is the number of attempts before bailing out.
875              
876             It can be used with blocking_stop, so if the number of attempts are exhausted,
877             a SIGKILL and waitpid will be tried at the end.
878              
879             =head2 kill_whole_group
880              
881             use Mojo::IOLoop::ReadWriteProcess;
882             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { setpgrp(0, 0); exec(...); }, kill_whole_group => 1 );
883             $process->start();
884             $process->send_signal(...); # Will skip the usual check whether $process->pid is running
885             $process->stop(); # Kills the entire process group and waits for all processes in the group to finish
886              
887             Defaults to C<0>, whether to send signals (e.g. to stop) to the entire process group.
888              
889             This is useful when the sub process creates further sub processes and creates a new process
890             group as shown in the example. In this case it might be useful to take care of the entire process
891             group when stopping and wait for every process in the group to finish.
892              
893             =head2 collect_status
894              
895             Defaults to C<1>, If enabled it will automatically collect the status of the children process.
896             Disable it in case you want to manage your process child directly, and do not want to rely on
897             automatic collect status. If you won't overwrite your C handler,
898             the C event will be still emitted.
899              
900             =head2 serialize
901              
902             Defaults to C<0>, If enabled data returned from forked process will be serialized with Storable.
903              
904             =head2 kill_sleeptime
905              
906             Defaults to C<1>, it's the seconds to wait before attempting SIGKILL when blocking_stop is set to 1.
907              
908             =head2 separate_err
909              
910             Defaults to C<1>, it will create a separate channel to intercept process STDERR,
911             otherwise it will be redirected to STDOUT.
912              
913             =head2 verbose
914              
915             Defaults to C<1>, it indicates message verbosity.
916              
917             =head2 set_pipes
918              
919             Defaults to C<1>, If enabled, additional pipes for process communication are automatically set up.
920              
921              
922             =head2 internal_pipes
923              
924             Defaults to C<1>, If enabled, additional pipes for retreiving process return and errors are set up.
925             Note: If you disable that, the only information provided by the process will be the exit_status.
926              
927             =head2 autoflush
928              
929             Defaults to C<1>, If enabled autoflush of handlers is enabled automatically.
930              
931             =head2 error
932              
933             Returns a L of errors.
934             Note: errors that can be captured only at the end of the process
935              
936             =head1 METHODS
937              
938             L inherits all methods from L and implements
939             the following new ones.
940              
941             =head2 start()
942              
943             use Mojo::IOLoop::ReadWriteProcess qw(process);
944             my $p = process(sub {
945             print STDERR "Boo\n"
946             } )->start;
947              
948             Starts the process
949              
950             =head2 stop()
951              
952             use Mojo::IOLoop::ReadWriteProcess qw(process);
953             my $p = process( execute => "/path/to/bin" )->start->stop;
954              
955             Stop the process. Unless you use C, it will attempt to kill the process
956             without waiting the process to finish. By defaults it send C to the child.
957             You can change that by defining the internal attribute C<_default_kill_signal>.
958             Note, if you want to be *sure* that the process gets killed, you can enable the
959             C attribute, that will attempt to send C after C
960             is reached.
961              
962             =head2 restart()
963              
964             use Mojo::IOLoop::ReadWriteProcess qw(process);
965             my $p = process( execute => "/path/to/bin" )->restart;
966              
967             It restarts the process if stopped, or if already running, it stops it first.
968              
969             =head2 is_running()
970              
971             use Mojo::IOLoop::ReadWriteProcess qw(process);
972             my $p = process( execute => "/path/to/bin" )->start;
973             $p->is_running;
974              
975             Boolean, it inspect if the process is currently running or not.
976              
977             =head2 exit_status()
978              
979             use Mojo::IOLoop::ReadWriteProcess qw(process);
980             my $p = process( execute => "/path/to/bin" )->start;
981              
982             $p->wait_stop->exit_status;
983              
984             Inspect the process exit status, it does the shifting magic, to access to the real value
985             call C<_status()>.
986              
987             =head2 return_status()
988              
989             use Mojo::IOLoop::ReadWriteProcess qw(process);
990             my $p = process( sub { return 42 } )->start;
991              
992             my $s = $p->wait_stop->return_status; # 42
993              
994             Inspect the codeblock return.
995              
996             =head2 enable_subreaper()
997              
998             use Mojo::IOLoop::ReadWriteProcess qw(process);
999             my $p = process()->enable_subreaper;
1000              
1001             Mark the current process (not the child) as subreaper.
1002             This is used typically if you want to mark further children as subreapers inside other forks.
1003              
1004             my $master_p = process(
1005             sub {
1006             my $p = shift;
1007             $p->enable_subreaper;
1008              
1009             process(sub { sleep 4; exit 1 })->start();
1010             process(
1011             sub {
1012             sleep 4;
1013             process(sub { sleep 1; })->start();
1014             })->start();
1015             process(sub { sleep 4; exit 0 })->start();
1016             process(sub { sleep 4; die })->start();
1017             my $manager
1018             = process(sub { sleep 2 })->subreaper(1)->start();
1019             sleep 1 for (0 .. 10);
1020             $manager->stop;
1021             return $manager->session->all->size;
1022             });
1023              
1024             $master_p->subreaper(1);
1025              
1026             $master_p->on(collected => sub { $status++ });
1027              
1028             # On start we setup the current process as subreaper
1029             # So it's up on us to disable it after process is done.
1030             $master_p->on(stop => sub { shift()->disable_subreaper });
1031             $master_p->start();
1032              
1033             =head2 disable_subreaper()
1034              
1035             use Mojo::IOLoop::ReadWriteProcess qw(process);
1036             my $p = process()->disable_subreaper;
1037              
1038             Unset the current process (not the child) as subreaper.
1039              
1040             =head2 prctl()
1041              
1042             use Mojo::IOLoop::ReadWriteProcess qw(process);
1043             my $p = process();
1044             $p->prctl($option, $arg2, $arg3, $arg4, $arg5);
1045              
1046             Internal function to execute and wrap the prctl syscall, accepts the same arguments as prctl.
1047              
1048             =head2 diag()
1049              
1050             use Mojo::IOLoop::ReadWriteProcess qw(process);
1051             my $p = process(sub { print "Hello\n" });
1052             $p->on( stop => sub { shift->diag("Done!") } );
1053             $p->start->wait_stop;
1054              
1055             Internal function to print information to STDERR if verbose attribute is set or either DEBUG mode enabled.
1056             You can use it if you wish to display information on the process status.
1057              
1058             =head2 to_ioloop()
1059              
1060             use Mojo::IOLoop::ReadWriteProcess qw(process);
1061              
1062             my $p = process(sub { print "Hello from first process\n"; sleep 1 });
1063              
1064             $p->start(); # Start and sets the handlers
1065             my $stream = $p->to_ioloop; # Get the stream and demand to IOLoop
1066             my $output;
1067              
1068             # Hook on Mojo::IOLoop::Stream events
1069             $stream->on(read => sub { $output .= pop; $p->is_running ... });
1070              
1071             Mojo::IOLoop->singleton->start() unless Mojo::IOLoop->singleton->is_running;
1072              
1073             Returns a L object and demand the wait operation to L.
1074             It needs C enabled. Default IOLoop can be overridden in C.
1075              
1076             =head2 wait()
1077              
1078             use Mojo::IOLoop::ReadWriteProcess qw(process);
1079             my $p = process(sub { print "Hello\n" })->wait;
1080             # ... here now you can mangle $p handlers and such
1081              
1082             Waits until the process finishes, but does not performs cleanup operations (until stop is called).
1083              
1084             =head2 wait_stop()
1085              
1086             use Mojo::IOLoop::ReadWriteProcess qw(process);
1087             my $p = process(sub { print "Hello\n" })->start->wait_stop;
1088             # $p is not running anymore, and all possible events have been granted to be emitted.
1089              
1090             Waits until the process finishes, and perform cleanup operations.
1091              
1092             =head2 errored()
1093              
1094             use Mojo::IOLoop::ReadWriteProcess qw(process);
1095             my $p = process(sub { die "Nooo" })->start->wait_stop;
1096             $p->errored; # will return "1"
1097              
1098             Returns a boolean indicating if the process had errors or not.
1099              
1100             =head2 write_pidfile()
1101              
1102             use Mojo::IOLoop::ReadWriteProcess qw(process);
1103             my $p = process(sub { die "Nooo" } );
1104             $p->pidfile("foobar");
1105             $p->start();
1106             $p->write_pidfile();
1107              
1108             Forces writing PID of process to specified pidfile in the attributes of the object.
1109             Useful only if the process have been already started, otherwise if a pidfile it's supplied
1110             as attribute, it will be done automatically.
1111              
1112             =head2 write_stdin()
1113              
1114             use Mojo::IOLoop::ReadWriteProcess qw(process);
1115             my $p = process(sub { my $a = ; print STDERR "Hello my name is $a\n"; } )->start;
1116             $p->write_stdin("Larry");
1117             $p->read_stderr; # process STDERR will contain: "Hello my name is Larry\n"
1118              
1119             Write data to process STDIN.
1120              
1121             =head2 write_channel()
1122              
1123             use Mojo::IOLoop::ReadWriteProcess qw(process);
1124             my $p = process(sub {
1125             my $self = shift;
1126             my $parent_output = $self->channel_out;
1127             my $parent_input = $self->channel_in;
1128              
1129             while(defined(my $line = <$parent_input>)) {
1130             print $parent_output "PONG\n" if $line =~ /PING/i;
1131             }
1132             } )->start;
1133             $p->write_channel("PING");
1134             my $out = $p->read_channel;
1135             # $out is PONG
1136             my $child_output = $p->channel_out;
1137             while(defined(my $line = <$child_output>)) {
1138             print "Process is replying back with $line!\n";
1139             $p->write_channel("PING");
1140             }
1141              
1142             Write data to process channel. Note, it's not STDIN, neither STDOUT, it's a complete separate channel
1143             dedicated to parent-child communication.
1144             In the parent process, you can access to the same pipes (but from the opposite direction):
1145              
1146             my $child_output = $self->channel_out;
1147             my $child_input = $self->channel_in;
1148              
1149             =head2 read_stdout()
1150              
1151             use Mojo::IOLoop::ReadWriteProcess qw(process);
1152             my $p = process(sub {
1153             print "Boo\n"
1154             } )->start;
1155             $p->read_stdout;
1156              
1157             Gets a single line from process STDOUT.
1158              
1159             =head2 read_channel()
1160              
1161             use Mojo::IOLoop::ReadWriteProcess qw(process);
1162             my $p = process(sub {
1163             my $self = shift;
1164             my $parent_output = $self->channel_out;
1165             my $parent_input = $self->channel_in;
1166              
1167             print $parent_output "PONG\n";
1168             } )->start;
1169             $p->read_channel;
1170              
1171             Gets a single line from process channel.
1172              
1173             =head2 read_stderr()
1174              
1175             use Mojo::IOLoop::ReadWriteProcess qw(process);
1176             my $p = process(sub {
1177             print STDERR "Boo\n"
1178             } )->start;
1179             $p->read_stderr;
1180              
1181             Gets a single line from process STDERR.
1182              
1183             =head2 read_all_stdout()
1184              
1185             use Mojo::IOLoop::ReadWriteProcess qw(process);
1186             my $p = process(sub {
1187             print "Boo\n"
1188             } )->start;
1189             $p->read_all_stdout;
1190              
1191             Gets all the STDOUT output of the process.
1192              
1193             =head2 read_all_channel()
1194              
1195             use Mojo::IOLoop::ReadWriteProcess qw(process);
1196             my $p = process(sub {
1197             shift->channel_out->write("Ping")
1198             } )->start;
1199             $p->read_all_channel;
1200              
1201             Gets all the channel output of the process.
1202              
1203             =head2 read_all_stderr()
1204              
1205             use Mojo::IOLoop::ReadWriteProcess qw(process);
1206             my $p = process(sub {
1207             print STDERR "Boo\n"
1208             } )->start;
1209             $p->read_all_stderr;
1210              
1211             Gets all the STDERR output of the process.
1212              
1213             =head2 send_signal()
1214              
1215             use Mojo::IOLoop::ReadWriteProcess qw(process);
1216             use POSIX;
1217             my $p = process( execute => "/path/to/bin" )->start;
1218              
1219             $p->send_signal(POSIX::SIGKILL);
1220              
1221             Send a signal to the process
1222              
1223             =head1 EXPORTS
1224              
1225             =head2 parallel()
1226              
1227             use Mojo::IOLoop::ReadWriteProcess qw(parallel);
1228             my $pool = parallel sub { print "Hello\n" } => 5;
1229             $pool->start();
1230             $pool->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
1231             $pool->stop();
1232              
1233             Returns a L object that represent a group of processes.
1234              
1235             It accepts the same arguments as L, and the last one represent the number of processes to generate.
1236              
1237             =head2 batch()
1238              
1239             use Mojo::IOLoop::ReadWriteProcess qw(batch);
1240             my $pool = batch;
1241             $pool->add(sub { print "Hello\n" });
1242             $pool->on(stop => sub { shift->_diag("Done!") })->start->wait_stop;
1243              
1244             Returns a L object generated from supplied arguments.
1245             It accepts as input the same parameter of L constructor ( see parallel() ).
1246              
1247             =head2 process()
1248              
1249             use Mojo::IOLoop::ReadWriteProcess qw(process);
1250             my $p = process sub { print "Hello\n" };
1251             $p->start()->wait_stop;
1252              
1253             or even:
1254              
1255             process(sub { print "Hello\n" })->start->wait_stop;
1256              
1257             Returns a L object that represent a process.
1258              
1259             It accepts the same arguments as L.
1260              
1261             =head2 queue()
1262              
1263             use Mojo::IOLoop::ReadWriteProcess qw(queue);
1264             my $q = queue;
1265             $q->add(sub { return 42 } );
1266             $q->consume;
1267              
1268             Returns a L object that represent a queue.
1269              
1270             =head1 DEBUGGING
1271              
1272             You can set the MOJO_EVENTEMITTER_DEBUG environment variable to get some advanced diagnostics information printed to STDERR.
1273              
1274             MOJO_EVENTEMITTER_DEBUG=1
1275              
1276             Also, you can set MOJO_PROCESS_DEBUG environment variable to get diagnostics about the process execution.
1277              
1278             MOJO_PROCESS_DEBUG=1
1279              
1280             =head1 LICENSE
1281              
1282             Copyright (C) Ettore Di Giacinto.
1283              
1284             This library is free software; you can redistribute it and/or modify
1285             it under the same terms as Perl itself.
1286              
1287             =head1 AUTHOR
1288              
1289             Ettore Di Giacinto Eedigiacinto@suse.comE
1290              
1291             =cut