File Coverage

blib/lib/Parallel/PreForkManager.pm
Criterion Covered Total %
statement 156 188 82.9
branch 37 62 59.6
condition 5 12 41.6
subroutine 18 19 94.7
pod 10 10 100.0
total 226 291 77.6


line stmt bran cond sub pod time code
1             package Parallel::PreForkManager;
2              
3 16     16   25155 use strict;
  16         16  
  16         348  
4 16     16   61 use warnings;
  16         16  
  16         492  
5              
6             our $VERSION = '1.20170415'; # VERSION
7              
8 16     16   49 use Carp;
  16         20  
  16         785  
9 16     16   7351 use IO::Handle;
  16         71060  
  16         595  
10 16     16   6693 use IO::Select;
  16         17647  
  16         615  
11 16     16   7817 use JSON;
  16         151516  
  16         68  
12 16     16   9097 use English qw( -no_match_vars );
  16         43817  
  16         81  
13              
14             my $DEBUG = 0;
15              
16             sub new {
17 11     11 1 1760 my ( $Class, $Args ) = @_;
18              
19 11 50       44 croak "No ChildHandler set" if ! exists ( $Args->{'ChildHandler'} );
20              
21             my $Self = {
22             'ChildHandler' => $Args->{'ChildHandler'},
23             'ChildCount' => $Args->{'ChildCount'} || 10,
24             'Timeout' => $Args->{'Timeout'} || 0,
25 11   50     198 'WaitComplete' => $Args->{'WaitComplete'} || 1,
      50        
      50        
26             'JobQueue' => [],
27             'Select' => IO::Select->new(),
28             };
29              
30 11         121 foreach my $Arg ( qw { ParentCallback ProgressCallback JobsPerChild ChildSetupHook ChildTeardownHook } ) {
31 55 100       132 $Self->{ $Arg } = $Args->{ $Arg } if exists ( $Args->{ $Arg } );
32             }
33              
34 11   33     66 bless $Self, ref($Class) || $Class;
35              
36 11         22 return $Self;
37             }
38              
39             sub AddJob {
40 220     220 1 385 my ( $Self, $Job ) = @_;
41 220         220 push @{ $Self->{'JobQueue'} }, $Job;
  220         209  
42 220         187 return;
43             }
44              
45             sub RunJobs {
46 11     11 1 44 my ($Self) = @_;
47              
48             # If a worker dies, there's a problem
49             local $SIG{CHLD} = sub {
50 7     7   91653 my $pid = wait();
51 7 50       39975 if ( exists ( $Self->{'ToChild'}->{$pid} ) ) {
52 0         0 confess("Worker $pid died.");
53             }
54 11         253 };
55              
56             # Start the workers
57 11         44 $Self->StartChildren();
58              
59             # Read from the workers, loop until they all shut down
60 9         90 while ( %{ $Self->{'ToChild'} } ) {
  10         662  
61             READYLOOP:
62 9         72 while ( my @Ready = $Self->{'Select'}->can_read() ) {
63             READLOOP:
64 241         27738 foreach my $fh (@Ready) {
65 246         799 my $Result = $Self->Receive($fh);
66              
67 246 50       422 if ( !$Result ) {
68 0         0 $Self->{'Select'}->remove($fh);
69 0         0 print STDERR "$fh got eof\n";
70 0         0 next READLOOP;
71             }
72              
73 246         517 my $ResultMethod = $Result->{ 'Method' };
74 246 50       500 warn "Parent working on Method $ResultMethod\n" if $DEBUG;
75              
76             # Handle the initial request for work
77 246 100       600 if ( $ResultMethod eq 'Startup' ) {
78 50 50       87 if ( $#{ $Self->{'JobQueue'} } > -1 ) {
  50         149  
79             #my $Child = $Self->{ 'ToChild' }->{ $Result->{ 'pid' } };
80 50         44 my $NextJob = shift( @{ $Self->{'JobQueue'} } );
  50         97  
81 50         278 $Self->Send( $Self->{'ToChild'}->{ $Result->{'pid'} }, { 'Job' => $NextJob, }, );
82 50         532 next READLOOP;
83             }
84             else {
85             # Nothing to do, shut down
86 0         0 $Self->{'Select'}->remove($fh);
87 0         0 my $fh = $Self->{'ToChild'}->{ $Result->{'pid'} };
88 0         0 delete( $Self->{'ToChild'}->{ $Result->{'pid'} } );
89 0         0 $Self->Send( $fh, { 'Shutdown' => 1, }, );
90 0         0 close($fh);
91             }
92             }
93              
94             # Process the result handler
95 196 100       638 if ( $ResultMethod eq 'Completed' ) {
96             # The child has completed it's work, process the results.
97 96 50 33     885 if ( $Result->{'Data'} && exists( $Self->{'ParentCallback'} ) ) {
98 96         154 &{ $Self->{'ParentCallback'} }( $Self, $Result->{'Data'} );
  96         344  
99             }
100              
101             # If the child has reached its processing limit then shut it down
102 96 100       1061 if ( exists( $Result->{'JobsPerChildLimitReached'} ) ) {
103 46         149 $Self->{'Select'}->remove($fh);
104 46         1322 my $fh = $Self->{'ToChild'}->{ $Result->{'pid'} };
105 46         124 delete( $Self->{'ToChild'}->{ $Result->{'pid'} } );
106 46         641 $Self->Send( $fh, { 'Shutdown' => 1, }, );
107 46         272 close($fh);
108             # If there are still jobs to be done then start a new child
109 46 100       47 if ( $#{ $Self->{'JobQueue'} } > -1 ) {
  46         174  
110 44         100 $Self->StartChild();
111             }
112 38         2616 next READLOOP;
113             }
114              
115             # If there's still work to be done, send it to the child
116 50 50       37 if ( $#{ $Self->{'JobQueue'} } > -1 ) {
  50         156  
117 50         39 my $NextJob = shift( @{ $Self->{'JobQueue'} } );
  50         146  
118 50         178 $Self->Send( $Self->{'ToChild'}->{ $Result->{'pid'} }, { 'Job' => $NextJob, }, );
119 50         406 next READLOOP;
120             }
121              
122             # There is no more work to be done, shut down this child
123 0         0 $Self->{'Select'}->remove($fh);
124 0         0 my $fh = $Self->{'ToChild'}->{ $Result->{pid} };
125 0         0 delete( $Self->{'ToChild'}->{ $Result->{pid} } );
126 0         0 close($fh);
127 0         0 next READLOOP;
128             }
129              
130 100 50       2728 if ( $ResultMethod eq 'ProgressCallback' ) {
131 100         371 my $Method = $Result->{'ProgressCallbackMethod'};
132 100         409 my $Data = $Result->{'ProgressCallbackData'};
133 100 50       427 if ( exists( $Self->{'ProgressCallback'}->{$Method} ) ) {
134 100         157 my $MethodResult = &{ $Self->{'ProgressCallback'}->{$Method} }( $Self, $Data );
  100         460  
135 100         1563 $Self->Send( $Self->{'ToChild'}->{ $Result->{'pid'} }, $MethodResult );
136              
137             }
138             else {
139 0         0 confess "Unknown callback method";
140             }
141              
142 100         628 next READLOOP;
143             }
144              
145             }
146             }
147             }
148              
149 1 50       16 if ( $Self->{ 'WaitComplete' } ) {
150 1         7 $Self->WaitComplete();
151             }
152              
153 1         22 return;
154             }
155              
156             sub WaitComplete {
157 1     1 1 3 my ( $Self ) = @_;
158 1         139361 while ( ( my $pid = wait() ) != -1 ) { }
159 1         4 return;
160             }
161              
162             sub StartChildren {
163 11     11 1 22 my ($Self) = @_;
164              
165 11         11 my $MaxChildren = $Self->{ 'ChildCount' };
166 11         22 my $ActualJobs = scalar @{ $Self->{ 'JobQueue' } };
  11         11  
167              
168 11 50       33 if ( $ActualJobs < $MaxChildren ) {
169 0         0 $MaxChildren = $ActualJobs;
170             }
171              
172 11         33 foreach ( 1 .. $MaxChildren ) {
173 21         53 $Self->StartChild();
174             }
175              
176 9         45 return;
177             }
178              
179             sub StartChild {
180 65     65 1 85 my ($Self) = @_;
181              
182             # Open a pipe for the worker
183 65         55 my ( $FromParent, $FromChild, $ToParent, $ToChild );
184 65         1541 pipe( $FromParent, $ToChild );
185 65         695 pipe( $FromChild, $ToParent );
186              
187             # Fork off a worker
188 65         34550 my $pid = fork();
189              
190 65 100       1845 if ($pid) {
    50          
191             # Parent
192              
193             # Close unused pipes
194 55         1083 close($ToParent);
195 55         514 close($FromParent);
196              
197 55         1210 $Self->{'ToChild'}->{$pid} = $ToChild;
198 55         436 $Self->{'FromChild'}->{$pid} = $FromChild;
199 55         1872 $Self->{'Select'}->add($FromChild);
200              
201             }
202             elsif ( $pid == 0 ) {
203             # Child
204              
205 10 50       290 warn "Child $PID spawned" if $DEBUG;
206              
207             # Close unused pipes
208 10         347 close($FromChild);
209 10         248 close($ToChild);
210              
211             # Setup communication pipes
212 10         163 $Self->{'ToParent'} = $ToParent;
213 10         857 open( STDIN, '<', '/dev/null' );
214              
215             # Send the initial request
216 10         339 $Self->Send( $ToParent, { 'Method' => 'Startup' } );
217              
218             # Start processing
219 10         88 $Self->Child($FromParent);
220              
221             # When the worker subroutine completes, exit
222 0         0 exit 0;
223             }
224             else {
225 0         0 confess("Failed to fork: $!");
226             }
227              
228 55         6806 return;
229             }
230              
231             sub Child {
232 10     10 1 23 my ( $Self, $FromParent ) = @_;
233 10         64 $Self->{'FromParent'} = $FromParent;
234              
235 10 50       57 if ( exists( $Self->{'ChildSetupHook'} ) ) {
236 0         0 &{ $Self->{'ChildSetupHook'} }( $Self );
  0         0  
237             }
238              
239             # Read instructions from the parent
240 10         643 while ( my $Instructions = $Self->Receive($FromParent) ) {
241              
242             # If the handler's children die, that's not our business
243 30         370 $SIG{CHLD} = 'IGNORE';
244              
245 30 100       352 if ( exists( $Instructions->{'Shutdown'} ) ) {
246 10 50       65 warn "Child $PID shutdown" if $DEBUG;
247 10 50       59 if ( exists( $Self->{'ChildTeardownHook'} ) ) {
248 0         0 &{ $Self->{'ChildTeardownHook'} }( $Self );
  0         0  
249             }
250 10         3187 exit 0;
251             }
252              
253             # Execute the handler with the given instructions
254 20         37 my $Result;
255 20         60 eval {
256              
257             # Handle alarms
258             local $SIG{ALRM} = sub {
259 0     0   0 die "Child timed out.";
260 20         422 };
261              
262             # Set alarm
263 20         125 alarm( $Self->{'Timeout'} );
264              
265             # Execute the handler and get it's result
266 20 50       137 if ( exists( $Self->{'ChildHandler'} ) ) {
267 20         57 $Result = &{ $Self->{'ChildHandler'} }( $Self, $Instructions->{'Job'} );
  20         197  
268             }
269              
270             # Disable alarm
271 20         287 alarm(0);
272              
273             };
274              
275             # Warn on errors
276 20 50       52 if ($@) {
277 0 0       0 if ( exists( $Self->{'ChildTeardownHook'} ) ) {
278 0         0 &{ $Self->{'ChildTeardownHook'} }( $Self );
  0         0  
279             }
280 0         0 croak("Child $PID error: $@");
281             }
282              
283 20         181 my $ResultToParent = {
284             'Method' => 'Completed',
285             'Data' => $Result,
286             };
287              
288 20 50       65 if ( exists( $Self->{'JobsPerChild'} ) ) {
289 20         69 $Self->{'JobsPerChild'} = $Self->{'JobsPerChild'} - 1;
290 20 100       94 if ( $Self->{'JobsPerChild'} == 0 ) {
291 10         58 $ResultToParent->{'JobsPerChildLimitReached'} = 1;
292             }
293             }
294              
295             # Send the result to the server
296 20         67 $Self->Send( $Self->{'ToParent'}, $ResultToParent );
297             }
298              
299 0 0       0 if ( exists( $Self->{'ChildTeardownHook'} ) ) {
300 0         0 &{ $Self->{'ChildTeardownHook'} }( $Self );
  0         0  
301             }
302              
303 0 0       0 warn "Child $PID completed" if $DEBUG;
304 0         0 exit 0;
305             }
306              
307             sub ProgressCallback {
308 20     20 1 348 my ( $Self, $Method, $Data ) = @_;
309 20         220 $Self->Send( $Self->{'ToParent'}, {
310             'Method' => 'ProgressCallback',
311             'ProgressCallbackMethod' => $Method,
312             'ProgressCallbackData' => $Data,
313             } );
314 20         88 my $Result = $Self->Receive( $Self->{'FromParent'} );
315 20         73 return $Result;
316             }
317              
318             sub Receive {
319 296     296 1 290 my ( $Self, $fh ) = @_;
320              
321             # Get a value from the file handle
322 296         249 my $Value;
323             my $Char;
324 296         69910 while ( read( $fh, $Char, 1 ) ) {
325 22782 100       34050 if ( $Char eq "\n" ) {
326 296         405 last;
327             }
328 22486         47679 $Value .= $Char;
329             }
330              
331             # Deserialize the data
332 296         548 my $Data = eval { decode_json($Value) };
  296         3339  
333              
334 296         734 return $Data;
335             }
336              
337             sub Send {
338 296     296 1 514 my ( $Self, $fh, $Value ) = @_;
339              
340 296         880 $Value->{'pid'} = $PID;
341              
342 296         1761 my $Encoded = encode_json($Value);
343 296         1443 print $fh "$Encoded\n";
344              
345             # Force the file handle to flush
346 296         6870 $fh->flush();
347              
348 296         675 return;
349             }
350              
351             1;
352              
353             __END__