File Coverage

blib/lib/Parallel/SubFork.pm
Criterion Covered Total %
statement 53 56 94.6
branch 7 10 70.0
condition 1 3 33.3
subroutine 12 12 100.0
pod 5 5 100.0
total 78 86 90.7


line stmt bran cond sub pod time code
1             package Parallel::SubFork;
2              
3             =head1 NAME
4              
5             Parallel::SubFork - Manage Perl functions in forked processes.
6              
7             =head1 SYNOPSIS
8              
9             use Parallel::SubFork;
10             my $manager = Parallel::SubFork->new();
11            
12             # Start two parallel tasks
13             $manager->start(sub { sleep 10; print "Done\n" });
14             $manager->start(\&callback, @args);
15            
16             # Wait for all tasks to resume
17             $manager->wait_for_all();
18            
19             # Loop through all tasks
20             foreach my $task ($manager->tasks) {
21             # Access any of the properties
22             printf "Task with PID %d resumed\n", $task->pid;
23             printf "Exist status: %d, exit code: %d\n", $task->status, $task->exit_code;
24             printf "Args of task where: %s\n", join(', ', $task->args);
25             print "\n";
26             }
27              
28             or more easily:
29              
30             use Parallel::SubFork qw(sub_fork);
31            
32             my $task = sub_fork(\&callback, @args);
33             $task->wait_for();
34              
35             =head1 DESCRIPTION
36              
37             This module provides a simple wrapper over the module L
38             which in turns simplifies the usage of the system calls C and C.
39             The idea is to isolate the tasks to be execute in functions or closures and to
40             execute them in a separated process in order to take advantage of
41             parallelization.
42              
43             =head1 TASKS
44              
45             A task is simply a Perl function or a closure that will get executed in a
46             different process. This module will take care of creating and managing the new
47             processes. All that's left is to code the logic of each task and to provide the
48             proper I (IPC) mechanism if needed.
49              
50             A task will run in it's own process thus it's important to understand that all
51             modifications to variables within the function, even global variables, will have
52             no impact on the parent process. Communication or data exchange between the task
53             and the dispatcher (the code that started the task) has to be performed through
54             standard IPC mechanisms. For further details on how to establish different
55             communication channels refer to the documentation of L.
56              
57             Since a task is running within a process it's expected that the task will return
58             an exit code (C<0> for an execution without flaws and any other integer for
59             reporting an error) and not a true value in the I sense. The return value
60             will be used as the exit code of the process that's running the task.
61              
62             =cut
63              
64 4     4   122917 use strict;
  4         9  
  4         197  
65 4     4   22 use warnings;
  4         12  
  4         119  
66              
67 4     4   24 use Carp;
  4         12  
  4         352  
68              
69 4     4   18 use base 'Exporter';
  4         6  
  4         484  
70             our @EXPORT_OK = qw(
71             sub_fork
72             );
73              
74 4     4   2695 use Parallel::SubFork::Task;
  4         12  
  4         32  
75              
76 4     4   141 use base qw(Class::Accessor::Fast);
  4         38  
  4         2336  
77             __PACKAGE__->mk_accessors(
78             qw(
79             tasks
80             _dispatcher_pid
81             )
82             );
83              
84              
85             # Version of the module
86             our $VERSION = '0.10';
87              
88              
89             =head1 FUNCTIONS
90              
91             The module provides the following functions:
92              
93             =cut
94              
95              
96             =head2 sub_fork
97              
98             This function provides a simple way for creating and launching tasks. It is
99             declared using a prototype which allows it to be called as:
100              
101             my $task = sub_fork { print "$$ > $_\n" for 1 .. 10 };
102             $task->wait_for();
103              
104             Parameters:
105              
106             =over
107              
108             =item $code
109              
110             The code reference to execute.
111              
112             =item @args (optional)
113              
114             The arguments to pass to the code reference.
115              
116             =back
117              
118             =cut
119              
120             sub sub_fork (&;@) {
121              
122             # Arguments
123 3     3 1 3551 my ($code, @args) = @_;
124              
125 3         24 my $task;
126             eval {
127 3         36 $task = Parallel::SubFork::Task->start($code, @args);
128 3         99 1;
129 3 50       12 } or do {
130 0         0 croak $@;
131             };
132 3         64 return $task;
133             }
134              
135              
136             =head1 METHODS
137              
138             The module defines the following methods:
139              
140             =cut
141              
142              
143             =head2 new
144              
145             Creates a new C.
146              
147             =cut
148              
149             sub new {
150              
151             # Arguments
152 3     3 1 1305 my $class = shift;
153            
154             # Create a blessed instance
155 3   33     29 my $self = bless {}, ref($class) || $class;
156            
157             # The list of children spawned
158 3         20 $self->tasks([]);
159            
160             # The PID of the dispacher
161 3         19 $self->_dispatcher_pid($$);
162              
163 3         31 return $self;
164             }
165              
166              
167             =head2 start
168              
169             Starts the execution of a new task in a different process. A task consists of a
170             code reference (a closure or a reference to a subroutine) and of an arguments
171             list.
172              
173             This method will actually fork a new process and execute the given code
174             reference in the child process. For the parent process this method will return
175             automatically. The child process will start executing the code reference with
176             the given arguments.
177              
178             The parent process, the one that started the task should wait for the child
179             process to resume. This can be performed individually on each tasks through the
180             method L<"Parallel::SubFork::Task/wait_for"> or for all tasks launched through
181             this instance through the method L<"wait_for_all">
182              
183             B This method requires that the caller process is the same process as the
184             one that created the instance object being called.
185              
186             Parameters:
187              
188             =over
189              
190             =item $code
191              
192             The code reference to execute.
193              
194             =item @args (optional)
195              
196             The arguments to pass to the code reference.
197              
198             =back
199              
200             =cut
201              
202             sub start {
203              
204             # Arguments
205 12     12 1 9990 my $self = shift;
206 12         26 my ($code, @args) = @_;
207              
208             # Stop if this is not the dispatcher
209 12         87 $self->_assert_is_dispatcher();
210              
211              
212             # Start the task and remember it
213 12         177 my $task;
214             eval {
215 12         63 $task = Parallel::SubFork::Task->start($code, @args);
216 7         143 1;
217 12 100       23 } or do {
218 5         936 croak $@;
219             };
220 7         24 push @{ $self->{tasks} }, $task;
  7         165  
221            
222 7         166 return $task;
223             }
224              
225              
226             =head2 wait_for_all
227              
228             This method waits for all tasks started so far and returns when they all have
229             resumed. This is useful for creating a rally point for multiple tasks.
230              
231             B This method requires that the caller process is the same process as the
232             one that created the instance object being called.
233              
234             =cut
235              
236             sub wait_for_all {
237 2     2 1 82 my $self = shift;
238              
239 2         55 $self->_assert_is_dispatcher();
240              
241 2         67 foreach my $task ($self->tasks) {
242             eval {
243 6         48 $task->wait_for();
244 6         44 1;
245 6 50       27 } or do {
246 0         0 croak $@;
247             };
248             }
249             }
250              
251              
252             =head2 tasks
253              
254             Returns the tasks started so far by this instance. This method returns a list
255             and not an array ref.
256              
257             =cut
258              
259             sub tasks {
260 11     11 1 7433 my $self = shift;
261              
262 11         80 my $tasks = $self->{tasks};
263 11 100       38 my @tasks = defined $tasks ? @{ $tasks } : ();
  5         35  
264 11         98 return @tasks;
265             }
266              
267              
268             =head2 _assert_is_dispatcher
269              
270             Used to check if the current process is the same one that invoked the
271             constructor.
272              
273             This is required as only the dispatcher process is allowed to start and wait for
274             tasks.
275              
276             =cut
277              
278             sub _assert_is_dispatcher {
279 15     15   432 my $self = shift;
280 15 50       76 return if $self->_dispatcher_pid == $$;
281 0           croak "Process $$ is not the main dispatcher";
282             }
283              
284              
285             # Return a true value
286             1;
287              
288              
289             =head1 NOTES
290              
291             The API is not yet frozen and could change as the module goes public.
292              
293             =head1 SEE ALSO
294              
295             Take a look at L for asynchronous multitasking and networking.
296              
297             =head1 AUTHOR
298              
299             Emmanuel Rodriguez, Eemmanuel.rodriguez@gmail.comE
300              
301             =head1 COPYRIGHT AND LICENSE
302              
303             Copyright (C) 2008-2010 by Emmanuel Rodriguez
304              
305             This library is free software; you can redistribute it and/or modify
306             it under the same terms as Perl itself, either Perl version 5.8.8 or,
307             at your option, any later version of Perl 5 you may have available.
308              
309             =cut