File Coverage

blib/lib/Parallel/Fork/BossWorker.pm
Criterion Covered Total %
statement 116 127 91.3
branch 16 26 61.5
condition 7 14 50.0
subroutine 17 18 94.4
pod 3 7 42.8
total 159 192 82.8


line stmt bran cond sub pod time code
1             package Parallel::Fork::BossWorker;
2             #
3             # $Id: BossWorker.pm 11 2011-07-16 15:49:45Z twilde $
4             #
5              
6 6     6   149496 use 5.008008;
  6         24  
  6         228  
7 6     6   36 use strict;
  6         6  
  6         258  
8 6     6   30 use warnings;
  6         66  
  6         156  
9 6     6   30 use Carp;
  6         12  
  6         630  
10 6     6   7290 use Data::Dumper qw(Dumper);
  6         73500  
  6         498  
11 6     6   8244 use IO::Handle;
  6         49020  
  6         282  
12 6     6   6006 use IO::Select;
  6         13086  
  6         7452  
13              
14             # Perl module variables
15             our @ISA = qw();
16             our $VERSION = '0.05';
17              
18             sub new {
19 6     6 1 2796 my $class = shift;
20 6         30 my %values = @_;
21            
22 6   50     162 my $self = {
      50        
      50        
      50        
23             result_handler => $values{result_handler} || undef, # Method for handling output of the workers
24             worker_count => $values{worker_count} || 10, # Number of workers
25             global_timeout => $values{global_timeout} || 0, # Number of seconds before the worker terminates the job, 0 for unlimited
26             work_handler => $values{work_handler}, # Handler which will process the data from the boss
27             work_queue => [],
28             msg_delimiter => $values{msg_delimiter} || "\0\0\0",
29             select => IO::Select->new(),
30             };
31 6         96 $self->{msg_delimiter_length} = length($self->{msg_delimiter});
32 6   33     42 bless $self, ref($class) || $class;
33              
34             # The work handler is required
35 6 50       54 if (not defined $self->{work_handler}) {
36 0         0 croak("Parameters \`work_handler' is required.");
37             }
38              
39 6         30 return $self;
40             }
41              
42             sub add_work(\@) {
43 48     48 1 1872 my $self = shift;
44 48         42 my $work = shift;
45 48         54 unshift (@{ $self->{work_queue} }, $work);
  48         96  
46             }
47              
48             sub process {
49 6     6 1 36 my $self = shift;
50 6         12 my $handler = shift;
51            
52 6         6 eval {
53            
54             # If a worker dies, there's a problem
55             local $SIG{CHLD} = sub {
56 5     5   660847 my $pid = wait();
57 5 50       173 if (defined $self->{workers}->{$pid}) {
58 0         0 confess("Worker $pid died.");
59             }
60 6         294 };
61            
62             # Start the workers
63 6         36 $self->start();
64            
65             # Read from the workers, loop until they all shut down
66 1         27 while (%{$self->{workers}}) {
  5         72  
67 4         55 while (my @ready = $self->{select}->can_read()) {
68 8         9362303 foreach my $fh (@ready) {
69 13         74 my $result = $self->receive($fh);
70 13 50       44 if (!$result) {
71 0         0 $self->{select}->remove($fh);
72 0         0 print STDERR "$fh got eof\n";
73 0         0 next;
74             }
75            
76             # Process the result handler
77 13 100 66     83 if ($result->{data} && defined $self->{result_handler}) {
78 8         18 &{ $self->{result_handler} }( $result->{data} );
  8         178  
79             }
80            
81             # If there's still work to be done, send it to the worker, otherwise shut it down
82 13 100       42725 if ($#{ $self->{work_queue} } > -1) {
  13         85  
83 8         27 my $worker = $self->{workers}->{$result->{pid}};
84 8         35 $self->send(
85             $self->{workers}->{ $result->{pid} }, # Worker's pipe
86 8         20 pop(@{ $self->{work_queue} })
87             );
88             } else {
89 5         35 $self->{select}->remove($fh);
90 5         279 my $fh = $self->{workers}->{ $result->{pid} };
91 5         13 delete($self->{workers}->{ $result->{pid} });
92 5         13152 close($fh);
93             }
94             }
95             }
96             }
97            
98             # Wait for our children so the process table won't fill up
99 1         255437 while ((my $pid = wait()) != -1) { }
100             };
101            
102 1 50       13 if ($@) {
103 0         0 croak($@);
104             }
105             }
106              
107             sub start {
108 6     6 0 12 my $self = shift();
109            
110             # Create a pipe for the workers to communicate to the boss
111            
112             # Create the workers
113 6         18 foreach (1..$self->{worker_count}) {
114            
115             # Open a pipe for the worker
116 20         771 pipe(my $from_boss, my $to_worker);
117 20         675 pipe(my $from_worker, my $to_boss);
118            
119             # Fork off a worker
120 20         27623 my $pid = fork();
121            
122 20 100       1616 if ($pid > 0) {
    50          
123            
124             # Boss
125 15         763 $self->{workers}->{$pid} = $to_worker;
126 15         152 $self->{from_worker}->{$pid} = $from_worker;
127 15         922 $self->{select}->add($from_worker);
128              
129             # Close unused pipes
130 15         2335 close($to_boss);
131 15         901 close($from_boss);
132            
133             } elsif ($pid == 0) {
134            
135             # Worker
136            
137             # Close unused pipes
138 5         608 close($from_worker);
139 5         143 close($to_worker);
140            
141             # Setup communication pipes
142 5         203 $self->{to_boss} = $to_boss;
143 5         1216 open(STDIN, '/dev/null');
144            
145             # Send the initial request
146 5         608 $self->send($to_boss, {pid => $$});
147            
148             # Start processing
149 5         556 $self->worker($from_boss);
150            
151             # When the worker subroutine completes, exit
152 5         4626 exit;
153             } else {
154 0         0 confess("Failed to fork: $!");
155             }
156             }
157             }
158              
159             sub worker(\*) {
160 5     5 0 13 my $self = shift();
161 5         11 my $from_boss = shift();
162            
163             # Read instructions from the server
164 5         20 while (my $instructions = $self->receive($from_boss)) {
165            
166             # If the handler's children die, that's not our business
167 8         310 $SIG{CHLD} = 'IGNORE';
168            
169             # Execute the handler with the given instructions
170 8         142 my $result;
171 8         26 eval {
172             # Handle alarms
173             local $SIG{ALRM} = sub {
174 0     0   0 die "Work handler timed out."
175 8         372 };
176            
177             # Set alarm
178 8         187 alarm($self->{global_timeout});
179            
180             # Execute the handler and get it's result
181 8 50       40 if (defined $self->{work_handler}) {
182 8         16 $result = &{ $self->{work_handler} }($instructions);
  8         89  
183             }
184            
185             # Disable alarm
186 8         12011331 alarm(0);
187             };
188            
189             # Warn on errors
190 8 50       49 if ($@) {
191 0         0 croak("Worker $$ error: $@");
192             }
193            
194             # Send the result to the server
195 8         95 $self->send($self->{to_boss}, {pid => $$, data => $result});
196             }
197             }
198              
199             sub receive(\*) {
200 26     26 0 56 my $self = shift();
201              
202             # Get the file handle
203 26         153 my $fh = shift();
204            
205             # Get a value from the file handle
206 26         52 my $value;
207             my $char;
208 26         12152653 while (read($fh, $char, 1)) {
209 1687         1975 $value .= $char;
210 1687 100       15710 if (substr($value, -($self->{msg_delimiter_length})) eq $self->{msg_delimiter}) {
211 21         76 $value = substr($value, 0, -($self->{msg_delimiter_length}));
212 21         82 last;
213             }
214             }
215            
216             # Deserialize the data
217 6     6   48 no strict;
  6         12  
  6         240  
218 6     6   36 no warnings;
  6         12  
  6         1506  
219 26         9216 my $data = eval($value);
220              
221 26 50       255 if ($@) {
222 0 0       0 print STDERR "Value: '$value'\n" if $ENV{PFBW_DEBUG};
223 0         0 confess("Failed to deserialize data: $@");
224             }
225              
226 26         145 return $data;
227             }
228              
229             sub send(\*$) {
230 21     21 0 102 my $self = shift();
231              
232             # Get the file handle
233 21         145 my $fh = shift();
234              
235             # Get the value which will be sent
236 21         68 my $value = shift();
237              
238             # Print the value to the file handle
239 21         149 local $Data::Dumper::Deepcopy = 1;
240 21         162 local $Data::Dumper::Indent = 0;
241 21         89 local $Data::Dumper::Purity = 1;
242 21         370 print $fh Dumper($value) . $self->{msg_delimiter};
243            
244             # Force the file handle to flush
245 21         8420 $fh->flush();
246             }
247              
248             1;
249             __END__