File Coverage

blib/lib/Parallel/PreForkManager.pm
Criterion Covered Total %
statement 189 199 95.4
branch 53 64 82.8
condition 1 3 33.3
subroutine 19 20 95.0
pod 11 11 100.0
total 273 297 92.2


line stmt bran cond sub pod time code
1             package Parallel::PreForkManager;
2              
3 514     514   289956 use strict;
  514         3413  
  514         10875  
4 514     514   2074 use warnings;
  514         941  
  514         14959  
5              
6             our $VERSION = '1.20180106'; # VERSION
7              
8 514     514   2071 use Carp;
  514         675  
  514         25646  
9 514     514   218422 use IO::Handle;
  514         2411590  
  514         17998  
10 514     514   190081 use IO::Select;
  514         650305  
  514         19540  
11 514     514   248251 use JSON;
  514         4957583  
  514         1773  
12 514     514   253123 use English qw( -no_match_vars );
  514         1288515  
  514         2206  
13              
14             my $DEBUG = 0;
15              
16             sub new {
17 731     731 1 859350 my ( $Class, $Args ) = @_;
18              
19 731 100       3504 croak "No ChildHandler set" if ! exists ( $Args->{'ChildHandler'} );
20              
21             my $Self = {
22 730         10852 'ChildHandler' => $Args->{'ChildHandler'},
23             'ChildCount' => 10,
24             'Timeout' => 0,
25             'WaitComplete' => 1,
26             'JobQueue' => [],
27             'Select' => IO::Select->new(),
28             'ChildrenForked' => 0,
29             };
30              
31 730         15125 foreach my $Arg ( qw { Timeout ChildCount WaitComplete ParentCallback ProgressCallback JobsPerChild ChildSetupHook ChildTeardownHook } ) {
32 5840 100       13565 $Self->{ $Arg } = $Args->{ $Arg } if exists ( $Args->{ $Arg } );
33             }
34              
35 730   33     6523 bless $Self, ref($Class) || $Class;
36              
37 730         1800 return $Self;
38             }
39              
40             sub AddJob {
41 87904     87904 1 237834 my ( $Self, $Job ) = @_;
42 87904         97176 push @{ $Self->{'JobQueue'} }, $Job;
  87904         111936  
43 87904         123287 return;
44             }
45              
46             sub RunJobs {
47 727     727 1 35569 my ($Self) = @_;
48              
49             # If a worker dies, there's a problem
50             local $SIG{CHLD} = sub {
51 20797     20797   167511302 my $pid = wait();
52 20797 50       141475646 if ( exists ( $Self->{'ToChild'}->{$pid} ) ) {
53 0         0 confess("Worker $pid died.");
54             }
55 727         21838 };
56              
57             # Start the workers
58 727         11096 $Self->StartChildren();
59              
60             # Read from the workers, loop until they all shut down
61 444         1248 while ( %{ $Self->{'ToChild'} } ) {
  572         10638  
62             READYLOOP:
63 447         7696 while ( my @Ready = $Self->{'Select'}->can_read() ) {
64             READLOOP:
65 28140         57348704 foreach my $fh (@Ready) {
66 148388         3737113 my $Result = $Self->Receive($fh);
67              
68 148388 50       358026 if ( !$Result ) {
69 0         0 $Self->{'Select'}->remove($fh);
70 0         0 print STDERR "$fh got eof\n";
71 0         0 next READLOOP;
72             }
73              
74 148388         480475 my $ResultMethod = $Result->{ 'Method' };
75 148388 50       323308 warn "Parent working on Method $ResultMethod\n" if $DEBUG;
76              
77             # Handle the initial request for work
78 148388 100       345375 if ( $ResultMethod eq 'Startup' ) {
79 47287 100       70392 if ( $#{ $Self->{'JobQueue'} } > -1 ) {
  47287         142765  
80             #my $Child = $Self->{ 'ToChild' }->{ $Result->{ 'pid' } };
81 46958         59698 my $NextJob = shift( @{ $Self->{'JobQueue'} } );
  46958         168417  
82 46958         7052341 $Self->Send( $Self->{'ToChild'}->{ $Result->{'pid'} }, { 'Job' => $NextJob, }, );
83 46958         434060 next READLOOP;
84             }
85             else {
86             # Nothing to do, shut down
87 329         4891 $Self->{'Select'}->remove($fh);
88 329         15475 my $fh = $Self->{'ToChild'}->{ $Result->{'pid'} };
89 329         1313 delete( $Self->{'ToChild'}->{ $Result->{'pid'} } );
90 329         3172 $Self->Send( $fh, { 'Shutdown' => 1, }, );
91 329         4827 close($fh);
92             }
93             }
94              
95             # Process the result handler
96 101430 100       6623946 if ( $ResultMethod eq 'Completed' ) {
97              
98             # The child has completed it's work, process the results.
99 50442 100       323694 if ( exists( $Self->{'ParentCallback'} ) ) {
100 50182         567389 $Self->{ 'Result' } = $Result;
101 50182         111014 &{ $Self->{'ParentCallback'} }( $Self, $Result->{ 'Data' } );
  50182         8716362  
102 50182         1116744 delete $Self->{ 'Result' };
103             }
104              
105             # If the child has reached its processing limit then shut it down
106 50442 100       127577 if ( exists( $Result->{'JobsPerChildLimitReached'} ) ) {
107 45139         197385 $Self->{'Select'}->remove($fh);
108 45139         7632681 my $fh = $Self->{'ToChild'}->{ $Result->{'pid'} };
109 45139         244621 delete( $Self->{'ToChild'}->{ $Result->{'pid'} } );
110 45139         686974 $Self->Send( $fh, { 'Shutdown' => 1, }, );
111 45139         698250 close($fh);
112             # If there are still jobs to be done then start a new child
113 45139 100       145892 if ( $#{ $Self->{'JobQueue'} } > -1 ) {
  45139         211287  
114 44387         297095 $Self->StartChild();
115             }
116 44822         6759590 next READLOOP;
117             }
118              
119             # If there's still work to be done, send it to the child
120 5303 100       13296 if ( $#{ $Self->{'JobQueue'} } > -1 ) {
  5303         20445  
121 5141         9539 my $NextJob = shift( @{ $Self->{'JobQueue'} } );
  5141         20659  
122 5141         38251 $Self->Send( $Self->{'ToChild'}->{ $Result->{'pid'} }, { 'Job' => $NextJob, }, );
123 5141         50670 next READLOOP;
124             }
125              
126             # There is no more work to be done, shut down this child
127 162         4545 $Self->{'Select'}->remove($fh);
128 162         6397 my $fh = $Self->{'ToChild'}->{ $Result->{pid} };
129 162         869 delete( $Self->{'ToChild'}->{ $Result->{pid} } );
130 162         2676 close($fh);
131 162         2130 next READLOOP;
132             }
133              
134 50988 100       246634 if ( $ResultMethod eq 'ProgressCallback' ) {
135 50659         300909 my $Method = $Result->{'ProgressCallbackMethod'};
136 50659         150470 my $Data = $Result->{'ProgressCallbackData'};
137 50659 100       154056 if ( exists( $Self->{'ProgressCallback'}->{$Method} ) ) {
138 50658         78558 my $MethodResult = &{ $Self->{'ProgressCallback'}->{$Method} }( $Self, $Data );
  50658         358072  
139 50657         1055581 $Self->Send( $Self->{'ToChild'}->{ $Result->{'pid'} }, $MethodResult );
140              
141             }
142             else {
143 1         85 confess "Unknown callback method";
144             }
145              
146 50657         8621680 next READLOOP;
147             }
148              
149             }
150             }
151             }
152              
153 125 50       2581 if ( $Self->{ 'WaitComplete' } ) {
154 125         999 $Self->WaitComplete();
155             }
156              
157 125         74539 return;
158             }
159              
160             sub GetResult {
161 20     20 1 90 my ( $Self ) = @_;
162 20         32 return $Self->{ 'Result' };
163             }
164              
165             sub WaitComplete {
166 125     125 1 1039 my ( $Self ) = @_;
167 125         21241339 while ( ( my $pid = wait() ) != -1 ) { }
168 125         9719 return;
169             }
170              
171             sub StartChildren {
172 727     727 1 2144 my ($Self) = @_;
173              
174 727         1343 my $MaxChildren = $Self->{ 'ChildCount' };
175 727         1043 my $ActualJobs = scalar @{ $Self->{ 'JobQueue' } };
  727         1829  
176              
177 727 100       2299 if ( $ActualJobs < $MaxChildren ) {
178 21         42 $MaxChildren = $ActualJobs;
179             }
180              
181 727         6162 foreach ( 1 .. $MaxChildren ) {
182 5816         49687 $Self->StartChild();
183             }
184              
185 444         6738 return;
186             }
187              
188             sub StartChild {
189 50203     50203 1 6140762 my ($Self) = @_;
190              
191             # Open a pipe for the worker
192 50203         95587 my ( $FromParent, $FromChild, $ToParent, $ToChild );
193 50203         6015091 pipe( $FromParent, $ToChild );
194 50203         13911622 pipe( $FromChild, $ToParent );
195              
196             # Fork off a worker
197 50203         136142882 my $pid = fork();
198              
199 50203 100       9705957 if ($pid) {
    50          
200             # Parent
201 49603         12545353 $Self->{ 'ChildrenForked' }++;
202              
203             # Close unused pipes
204 49603         4368625 close($ToParent);
205 49603         2252605 close($FromParent);
206              
207 49603         1548446 $Self->{'ToChild'}->{$pid} = $ToChild;
208 49603         8801602 $Self->{'FromChild'}->{$pid} = $FromChild;
209 49603         13348562 $Self->{'Select'}->add($FromChild);
210              
211             }
212             elsif ( $pid == 0 ) {
213             # Child
214              
215 600 50       50721 warn "Child $PID spawned" if $DEBUG;
216              
217             # Close unused pipes
218 600         18383 close($FromChild);
219 600         62044 close($ToChild);
220              
221             # Setup communication pipes
222 600         14361 $Self->{'ToParent'} = $ToParent;
223 600         60925 open( STDIN, '<', '/dev/null' );
224              
225             # Send the initial request
226 600         32407 $Self->Send( $ToParent, { 'Method' => 'Startup' } );
227              
228             # Start processing
229 600         11999 $Self->Child($FromParent);
230              
231             # When the worker subroutine completes, exit
232             # &Child should already have done the exit.
233 0         0 exit 0; # uncoverable statement
234             }
235             else {
236 0         0 confess("Failed to fork: $!");
237             }
238              
239 49603         19552651 return;
240             }
241              
242             sub Child {
243 600     600 1 4063 my ( $Self, $FromParent ) = @_;
244 600         5464 $Self->{'FromParent'} = $FromParent;
245              
246 600 100       5991 if ( exists( $Self->{'ChildSetupHook'} ) ) {
247 30         69 &{ $Self->{'ChildSetupHook'} }( $Self );
  30         1048  
248             }
249              
250             # Read instructions from the parent
251 600         105368 while ( my $Instructions = $Self->Receive($FromParent) ) {
252              
253             # If the handler's children die, that's not our business
254 1118         28033 $SIG{CHLD} = 'IGNORE';
255              
256 1118 100       47641 if ( exists( $Instructions->{'Shutdown'} ) ) {
257 338 50       3119 warn "Child $PID shutdown" if $DEBUG;
258 338 100       2836 if ( exists( $Self->{'ChildTeardownHook'} ) ) {
259 20         73 &{ $Self->{'ChildTeardownHook'} }( $Self );
  20         261  
260             }
261 338         612619 exit 0;
262             }
263              
264 780         4127 my $ResultToParent = {};
265 780         6486 $ResultToParent->{ 'Request' } = $Instructions;
266              
267             # Execute the handler with the given instructions
268 780         3863 my $Result;
269 780         3714 eval {
270              
271             # Handle alarms
272             local $SIG{ALRM} = sub {
273 0     0   0 die "Child timed out.";
274 780         376772 };
275              
276             # Set alarm
277 780         10852 alarm( $Self->{'Timeout'} );
278              
279             # Execute the handler and get it's result
280 780 50       6302 if ( exists( $Self->{'ChildHandler'} ) ) {
281 780         2693 $Result = &{ $Self->{'ChildHandler'} }( $Self, $Instructions->{'Job'} );
  780         27303  
282             }
283              
284             # Disable alarm
285 671         40108117 alarm(0);
286              
287             };
288              
289             # report errors
290 680 100       26267 if (my $Error = $@) {
291 9 50       33 warn "Child $PID errored: $@" if $DEBUG;
292 9 50       111 if ( exists( $Self->{'ChildTeardownHook'} ) ) {
293 0         0 eval { &{ $Self->{'ChildTeardownHook'} }( $Self ); };
  0         0  
  0         0  
294             }
295 9         18 $ResultToParent->{ 'Method' } = 'Completed';
296 9         21 $ResultToParent->{ 'Error' } = $Error;
297             }
298             else {
299 671         10008 $ResultToParent->{ 'Method' } = 'Completed';
300 671         10357 $ResultToParent->{ 'Data' } = $Result;
301             }
302              
303 680 100       12407 if ( exists( $Self->{'JobsPerChild'} ) ) {
304 550         5772 $Self->{'JobsPerChild'} = $Self->{'JobsPerChild'} - 1;
305 550 100       16921 if ( $Self->{'JobsPerChild'} == 0 ) {
306 327         2980 $ResultToParent->{'JobsPerChildLimitReached'} = 1;
307             }
308             }
309              
310             # Send the result to the server
311 680         15993 $Self->Send( $Self->{'ToParent'}, $ResultToParent );
312             }
313              
314 162 100       8950 if ( exists( $Self->{'ChildTeardownHook'} ) ) {
315 10         223 &{ $Self->{'ChildTeardownHook'} }( $Self );
  10         282  
316             }
317              
318 162 50       6519 warn "Child $PID completed" if $DEBUG;
319 162         1936758 exit 0;
320             }
321              
322             sub ProgressCallback {
323 480     480 1 15148 my ( $Self, $Method, $Data ) = @_;
324 480         9507 $Self->Send( $Self->{'ToParent'}, {
325             'Method' => 'ProgressCallback',
326             'ProgressCallbackMethod' => $Method,
327             'ProgressCallbackData' => $Data,
328             } );
329 480         3306 my $Result = $Self->Receive( $Self->{'FromParent'} );
330 480         3192 return $Result;
331             }
332              
333             sub Receive {
334 150148     150148 1 3398981 my ( $Self, $fh ) = @_;
335              
336             # Get a value from the file handle
337 150148         230375 my $Value;
338             my $Char;
339 150148         98756921 while ( read( $fh, $Char, 1 ) ) {
340 15005494 100       104717320 if ( $Char eq "\n" ) {
341 149986         279139 last;
342             }
343 14855508         236825691 $Value .= $Char;
344             }
345              
346             # Deserialize the data
347 150148         456078 my $Data = eval { decode_json($Value) };
  150148         31791657  
348              
349 150148         5508246 return $Data;
350             }
351              
352             sub Send {
353 149984     149984 1 1427210 my ( $Self, $fh, $Value ) = @_;
354              
355 149984         4959033 $Value->{'pid'} = $PID;
356              
357 149984         1638588 my $Encoded = encode_json($Value);
358 149984         22907143 print $fh "$Encoded\n";
359              
360             # Force the file handle to flush
361 149984         187460168 $fh->flush();
362              
363 149984         844525 return;
364             }
365              
366             1;
367              
368             __END__