File Coverage

blib/lib/Parallel/SubFork/Task.pm
Criterion Covered Total %
statement 73 93 78.4
branch 27 42 64.2
condition 4 9 44.4
subroutine 11 12 91.6
pod 6 6 100.0
total 121 162 74.6


line stmt bran cond sub pod time code
1             package Parallel::SubFork::Task;
2              
3             =head1 NAME
4              
5             Parallel::SubFork::Task - Run Perl functions in forked processes.
6              
7             =head1 SYNOPSIS
8              
9             use Parallel::SubFork::Task;
10            
11             # Run a some arbitrary Perl code in a separated process
12             my $task = Parallel::SubFork::Task->start(\&job, @args);
13             $task->wait_for();
14            
15             # Create and execute the task (same as above)
16             my $task2 = Parallel::SubFork::Task->new(\&job, @args);
17             $task2->execute();
18             $task2->wait_for();
19            
20             # Wait with a live progress
21             local $| = 1; # Force print to flush the output
22             my $task3 = Parallel::SubFork::Task->new(\&job, @args);
23             while ($task3->wait_for(0.5)) {
24             print ".";
25             }
26            
27             # Access any of the properties
28             printf "PID of task was %s\n", $task->pid;
29             printf "Args of task where %s\n", join(", ", $task->args);
30             printf "Exit code: %d\n", $task->exit_code;
31              
32             =head1 DESCRIPTION
33              
34             This module provides a simpler way to run arbitrary Perl code in a different
35             process. This module consists of a fancy wrapper over the system calls C
36             and C. The idea is to execute any standard Perl function in a different
37             process without any of the inconveniences of managing the forks by hand.
38              
39             =head1 TASK
40              
41             This module is used to encapsulate a task, i.e. the function to be executed in
42             a different process and it's arguments. In a nutshell a task consists of a
43             reference to a Perl function (C<\&my_sub>) or a closure (C),
44             also known as an anonymous subroutine, and optionally the arguments to provide
45             to that function.
46              
47             A task also stores some runtime properties such as the PID of the process that
48             executed the code, the exit code and the exit status of the process. These
49             properties can then be inspected by the parent process through their dedicated
50             accessors.
51              
52             There's also some helper methods that are used to create the child process and
53             to wait for it to resume.
54              
55             =head1 PROCESSES
56              
57             Keep in mind that the function being executed is run in a different process.
58             This means that any modification performed within that function will only affect
59             the process running the task. This is true even for global variables. All data
60             exchange or communication between the parent the child process has to be
61             implemented manually through standard I (IPC)
62             mechanisms (see L).
63              
64             The child process used to executes the Perl subroutines has it's environment
65             left unchanged. This means that all file descriptors, signal handlers and other
66             resources are still available. It's up to the subroutine to prepare it self a
67             proper environment.
68              
69             =head1 RETURN VALUES
70              
71             The subroutine return's value will be used as the process exit code, this is the
72             only thing that the invoking process will be able to get back from the task
73             without any kind of IPC. This means that the return value should be an integer.
74             Furthermore, since the return value is used as an exit value in this case C<0>
75             is considered as successful execution while any other value is usually
76             interpreted as an error.
77              
78             =head1 EXIT
79              
80             The subroutine is free to raise any exceptions through C or any similar
81             mechanism. If an error is caught by the framework it will be interpreted as an
82             error and an appropriate exit value will be used.
83              
84             If the subroutine needs to resume it's execution through a the system call
85             C then consider instead using C<_exit> as defined in the module L.
86             This is because C not only terminates the current process but it performs
87             some cleanup such as calling the functions registered with C and flush
88             all stdio streams before finishing the process. Normally, only the main process
89             should call C, in the case of a fork the children should finish their
90             execution through C.
91              
92             =head1 PROCESS WAIT
93              
94             Waiting for process to finish can be problematic as there are multiple ways for
95             waiting for processes to resume each having it's advantages and disadvantages.
96              
97             The easiest way is to register a signal handler for C signal. This has the
98             advantage of receiving the child notifications as they happen, the disadvantage
99             is that there's no way to control for which children the notifications will
100             happen. This is quite inconvenient because a lot of the nice built-in functions
101             and operators in Perl such as C<`ls`>, C and even C (when used in
102             conjunction with a C<|>) use child processes for their tasks and this could
103             potentially interfere with such utilities.
104              
105             Another alternative is to wait for all processes launched but this can also
106             interfere with other processed launched manually through C.
107              
108             Finally, the safest way is to wait explicitly B for the processes that we
109             know to have started and nothing else. This there will be no interference with
110             the other processes. This is exactly the approach used by this module.
111              
112             =head1 METHODS
113              
114             A task defines the following methods:
115              
116             =cut
117              
118              
119 6     6   45400 use strict;
  6         12  
  6         192  
120 6     6   29 use warnings;
  6         11  
  6         177  
121              
122 6         43 use POSIX qw(
123             WNOHANG
124             WIFEXITED
125             WEXITSTATUS
126             WIFSIGNALED
127             _exit
128 6     6   4700 );
  6         36814  
129              
130 6     6   6162 use Carp;
  6         10  
  6         344  
131              
132 6     6   32 use base qw(Class::Accessor::Fast);
  6         12  
  6         5363  
133             __PACKAGE__->mk_accessors(
134             qw(
135             _ppid
136             pid
137             code
138             exit_code
139             status
140             )
141             );
142              
143              
144             # Version of the module
145             our $VERSION = '0.08';
146              
147              
148             # Check if it's possible to use a high precision alarm
149             my $HIRES; # NOTE the initialization must be done in the BEGIN block otherwise
150             # the default value will override whatever was set in the BEGIN
151             # block.
152             BEGIN {
153 6     6   22681 $HIRES = 0; # Assume that there's no HiRes
154 6         15 eval {
155 6         6000 require Time::HiRes;
156 6         17071 $HIRES = 1;
157             };
158             }
159              
160              
161             =head2 start
162              
163             Creates and executes a new task, this is simply a small shortcut for starting
164             new tasks.
165              
166             In order to manage tasks easily consider using use the module
167             L instead.
168              
169             Parameters:
170              
171             $code: the code reference to execute in a different process.
172             @args: the arguments to pass to the code reference (optional).
173              
174             =cut
175              
176             sub start {
177 22     22 1 12906 my $class = shift;
178 22         56 my ($code, @args) = @_;
179 22 100       2555 croak "First parameter must be a code reference" unless ref $code eq 'CODE';
180            
181 12         78 my $task = $class->new($code, @args);
182 12         51 $task->execute();
183              
184 12         851 return $task;
185             }
186              
187              
188             =head2 new
189              
190             Creates a new task, this is simply a constructor and the task will not be
191             started yet.
192              
193             The task can latter by started through a call to L.
194              
195             In order to manage tasks easily consider using use the module
196             L instead.
197              
198             Parameters:
199              
200             =over
201              
202             =item $code
203              
204             The code reference to execute.
205              
206             =item @args (optional)
207              
208             The arguments to pass to the code reference.
209              
210             =back
211              
212             =cut
213              
214             sub new {
215 25     25 1 16490 my $class = shift;
216 25         75 my ($code, @args) = @_;
217 25 100       1120 croak "First parameter must be a code reference" unless ref $code eq 'CODE';
218            
219             # Create a blessed instance
220 20   33     279 my $self = bless {}, ref($class) || $class;
221 20         145 $self->code($code);
222 20         336 $self->{args} = \@args;
223            
224 20         64 return $self;
225             }
226              
227              
228             =head2 code
229              
230             Accessor to the function (code reference) that will be executed in a different
231             process. This is what the child process will execute.
232              
233             This function is expected to return C<0> for success and any other integer to
234             indicate a failure. The function is free to raise any kind of exception as the
235             framework will catch all exceptions and return an error value instead.
236              
237             The function will receive it's parameters normally through the variable C<@_>.
238              
239             =head2 pid
240              
241             The PID of the process executing the subroutine, the child's PID.
242              
243             =head2 exit_code
244              
245             The exit code of the task, this is the value returned by C,
246             C or C.
247              
248             =head2 status
249              
250             The exit code returned to the parent process as described by C. The status
251             code can be inspected through the L<"POSIX/WAIT"> macros .
252              
253             =head2 args
254              
255             The arguments that will be given to the subroutine being executed in a separated
256             process. The subroutine will receive this very same arguments through C<@_>.
257              
258             This method always return it's values as a list and not as an array ref.
259              
260             =cut
261              
262             sub args {
263 9     9 1 12206 my $self = shift;
264            
265 9         28 my $args = $self->{args};
266 9 50       52 my @args = defined $args ? @{ $args } : ();
  9         38  
267 9         145 return @args;
268             }
269              
270              
271             =head2 execute
272              
273             Executes the tasks (the code reference encapsulated by this task) in a new
274             process. The code reference will be invoked with the arguments passed in the
275             constructor.
276              
277             This method performs the actual fork and returns automatically for the invoker,
278             while the child process will start to execute the code in defined in the code
279             reference. Once the subroutine has finished the child process will resume right
280             away.
281              
282             The invoker (the parent process) should call L in order to wait for
283             the child process to finish and obtain it's exit value.
284              
285             =cut
286              
287             sub execute {
288 24     24 1 22092 my $self = shift;
289              
290             # Check that we don't run twice the same task
291 24 100       88 if (defined $self->pid) {
292 5         1136 croak "Task already exectuted";
293             }
294            
295             # Make sure that there's a code reference
296 19         150 my $code = $self->code;
297 19 100 66     283 if (! (defined $code and ref $code eq 'CODE')) {
298 5         1530 croak "Task requires a valid code reference (function)";
299             }
300              
301 14         64 my $ppid = $$;
302              
303             # Fork a child
304 14         14850 my $pid = fork();
305            
306             # Check if the fork succeeded
307 14 50       624 if (! defined $pid) {
308 0         0 croak "Can't fork because: $!";
309             }
310            
311 14         1299 $self->_ppid($ppid);
312 14 50       696 if ($pid == 0) {
313             ## CHILD part
314              
315             # Execute the main code
316 0         0 my $return = 1;
317             eval {
318 0         0 $return = $code->($self->args);
319 0         0 1;
320 0 0       0 } or do {
321 0         0 my $error = $@;
322 0         0 carp "Child executed with errors: ", $error;
323             };
324            
325             # This is as far as the kid gets if the callback hasn't called exit we do it
326 0         0 _exit($return);
327             }
328             else {
329             ## PARENT part
330 14         159 $self->pid($pid);
331             }
332             }
333              
334              
335             =head2 wait_for
336              
337             Waits until the process running the task (the code reference) has finished. By
338             default this method waits forever until task resumes either naturally or due to
339             an error.
340              
341             If a parameter is passed then it is assumed to be the number of seconds to wait.
342             Once the timeout has expired the method will return with a true value. This is
343             the only condition under which the method will return with a true value.
344              
345             If the module L is available then timeout can be in fractions (ex:
346             0.5 for half a second) otherwise full integers have to be provided. If not Perl
347             will round the results during the conversion to int.
348              
349             The timeout is implemented through C and has all the caveats of sleep,
350             see perdoc -f sleep for more details. Remember that sleep could take a second
351             less than requested (sleep 1 could do no sleep at all) and mixin calls to sleep
352             and alarm is at your own risks as sleep is sometimes implemented through alarm.
353             Furthermore, if a timeout between 0 and 1 second is provided as a fraction and
354             that C is not available Perl will round the value to 0.
355              
356             The exit status of the process can be inspected through the accessor
357             L and the actual status, the value returned in C<$?> by C
358             can be accessed through the accessor L.
359              
360             Parameters:
361              
362             =over
363              
364             =item $timeout (optional)
365              
366             The number of seconds to wait until the method returns due to a timeout. If
367             undef then the method doesn't apply a timeout and waits until the task has
368             resumed.
369              
370             =back
371              
372             Returns:
373              
374             If the method was invoked without a timeout then a false value will always be
375             returned, no matter the outcome of the task. If a timeout was provided then the
376             method will return a true value only when the timeout has been reached otherwise
377             a false value will be returned.
378              
379             =cut
380              
381             sub wait_for {
382 25     25 1 36846 my $self = shift;
383 25         92 my ($timeout) = @_;
384              
385 25         144 my $pid = $self->pid;
386 25 50 33     464 if (! (defined $pid and $pid > 0) ) {
387 0         0 croak "Task isn't started yet";
388             }
389            
390             # Only the real parent can wait for the child
391 25 50       134 if ($self->_ppid != $$) {
392 0         0 croak "Only the parent process can wait for the task";
393             }
394            
395             # Check if the task was already waited for
396 25 100       401 if (defined $self->status) {
397 7         59 return;
398             }
399            
400 18         149 my $timemout_done = 0; # Use to track if the waitpid was called enough times when passed a timeout
401 18 100       92 my $flags = defined $timeout ? WNOHANG : 0;
402 18         55 while (1) {
403            
404             # Wait for the specific PID
405 22         33442 my $result = waitpid($pid, $flags);
406              
407 22 50       263 if ($result == -1) {
    100          
    50          
408             # No more processes to wait for, but we didn't find our PID
409 0         0 croak "No more processes to wait PID $pid not found";
410             }
411             elsif ($result == 0) {
412             # The process is still running
413              
414             # If the method was called with a timeout we will retry waitpid once more;
415             # remember that it is invoked with no hang which means that the call will
416             # return instantaneously.
417 8 50       27 if (defined $timeout) {
418              
419             # In the case of a timeout we invoke this code once
420 8 100       77 return 1 if $timemout_done++;
421              
422             # NOTE: The timeout is implemented with a sleep instead of an alarm
423             # because some versions/combinations of perl and Time::HiRes cause
424             # Time::HiRes::alarm() to fail to interrupt system calls. For more
425             # information about this see Ticket #51465:
426             # https://rt.cpan.org/Ticket/Display.html?id=51465
427              
428             # Sleep and alarms don't mix well together, so we stop the current alarm
429             # and restore it later on.
430 4         20 my $alarm = alarm(0);
431 4 50       10 if ($HIRES) {
432 4         4001295 Time::HiRes::sleep($timeout);
433             }
434             else {
435 0         0 sleep($timeout);
436             }
437              
438             # If an alarm was set, restore it
439 4 50       83 alarm($alarm) if $alarm;
440             }
441              
442             # Continue waiting as the process is till waiting
443 4         17 next;
444             }
445             elsif ($result != $pid) {
446             # Strange we got another PID than ours
447 0         0 croak "Got a status change for PID $result while waiting for PID $pid";
448             }
449            
450             # Now we got a decent answer from waitpid, this doesn't mean that the child
451             # died! It just means that the child got a state change (the child
452             # terminated; the child was stopped by a signal; or the child was resumed
453             # by a signal). Here we must check if the process finished properly
454             # otherwise we must continue waiting for the end of the process.
455 14         220 my $status = $?;
456 14 50       97 if (WIFEXITED($status)) {
    0          
457 14         188 $self->status($status);
458 14         457 $self->exit_code(WEXITSTATUS($status));
459 14         193 return;
460             }
461             elsif (WIFSIGNALED($status)) {
462 0           $self->status($status);
463             # WEXITSTATUS is only defined for WIFEXITED, here we assume an error
464 0           $self->exit_code(1);
465 0           return;
466             }
467             }
468            
469 0           return;
470             }
471              
472              
473             =head2 kill
474              
475             Sends a signal to the process. This is a simple wrapper over the system call
476             C. It takes the kind of signal that the built-in kill function.
477              
478             B: Calling kill doesn't warranty that the task will die. Most signals can
479             be caught by the process and may not kill it. In order to be sure that the
480             process is killed it is advised to call L. Even if the signal kills
481             the process L has to be called otherwise the task's process will be
482             flagged as zombie process (see L).
483              
484             The following code snippet shows how to properly kill a task:
485              
486             my $task = Parallel::SubFork::Task->start(\&job);
487             if ($task->wait_for(2)) {
488             # Impatient block
489             $task->kill('KILL');
490             $task->wait_for();
491             }
492              
493             Parameters:
494              
495             =over
496              
497             =item $signal
498              
499             The signal to send to the process. Same as the first parameter passed to the
500             Perl built-in.
501              
502             =back
503              
504             Returns:
505              
506             The same value as Perl's C.
507              
508             =cut
509              
510             sub kill {
511 0     0 1   my $self = shift;
512 0           my ($signal) = @_;
513 0           kill $signal, $self->pid;
514             }
515              
516              
517             # Return a true value
518             1;
519              
520             =head1 NOTES
521              
522             The API is not yet frozen and could change as the module goes public.
523              
524             =head1 SEE ALSO
525              
526             Take a look at L for asynchronous multitasking and networking.
527              
528             =head1 AUTHOR
529              
530             Emmanuel Rodriguez, Eemmanuel.rodriguez@gmail.comE
531              
532             =head1 COPYRIGHT AND LICENSE
533              
534             Copyright (C) 2008-2010 by Emmanuel Rodriguez
535              
536             This library is free software; you can redistribute it and/or modify
537             it under the same terms as Perl itself, either Perl version 5.8.8 or,
538             at your option, any later version of Perl 5 you may have available.
539              
540             =cut