File Coverage

blib/lib/Proc/Swarm.pm
Criterion Covered Total %
statement 160 188 85.1
branch 29 50 58.0
condition 5 15 33.3
subroutine 25 29 86.2
pod 0 1 0.0
total 219 283 77.3


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3              
4             package Proc::Swarm;
5             {
6             $Proc::Swarm::VERSION = '1.133280';
7             }
8              
9 262     262   707400 use strict;use warnings;
  262     262   735  
  262         8960  
  262         1558  
  262         514  
  262         9256  
10 262     262   1293016 use IPC::Msg;
  262         5725755  
  262         9270  
11 262     262   349467 use Storable;
  262         108938711  
  262         589771  
12              
13             sub _usage {
14 0 0   0   0 print @_ . "\n" if defined @_;
15 0         0 print q(
16             Proc::Swarm::swarm(
17             code => $coderef,
18             children => $child_count,
19             work => \@work_units,
20             [sort => 1],
21             [debug => 1] );
22             );
23 0         0 exit 255;
24             }
25              
26             sub swarm {
27 283     283 0 10430 my $args = shift;
28 283 50       1700 _usage() if ref $args ne 'HASH';
29 283         1038 my $coderef = $args->{code};
30 283         1031 my $max_children = $args->{children};
31 283         544 my @units = @{$args->{work}};
  283         2332  
32 283         787 my $sort_output = $args->{sort};
33 283         799 my $sort_code = $args->{sort_code};
34              
35 283         4243 my @work_units = @units;
36 283 50       1300 _usage('No work defined') if (scalar @work_units) == 0;
37              
38 283 50       2080 _usage('Invalid code passed') unless ref $coderef eq 'CODE';
39 283 50 33     3768 _usage('Child count argument must be a non-negative, non-zero integer')
40             if $max_children < 1 or $max_children =~ /\./;
41 283 50       1101 _usage('Work units must not contain a reference')
42             if ref $work_units[0];
43              
44             #We now have something like clean arguments.
45              
46             #We need two message queues. One that the producer listens to, and
47             #another the consumer listens to.
48              
49 283         2711 my $Qc = Proc::Swarm::Queue->new; #consumer
50 283         123798 my $Qp = Proc::Swarm::Queue->new; #producer
51              
52             #The main parent is the consumer. It will exit last.
53             #The first child is the producer.
54 283         69314860 my $pid = fork();
55 283 50       38255 if(not defined $pid) { #fork failed
    100          
56 0         0 die 'Fork failed. Check your system resources.';
57             } elsif(not $pid) { #Child (producer)
58 256         3800 my $worker_count = 0;
59 256         4986 my $another_count = 0;
60             #first we spin off enough children to max out the count.
61 256         14138 for (1..$max_children) {
62 6660         275859 _worker(pop @work_units, $coderef, $Qc);
63 6544         126504 $worker_count++;
64             }
65              
66             #Now we should have $max_children processes. Wait for them
67             #to finish.
68 140         18592 while(1) {
69             #We are expecting one of:
70             # requests to spawn another worker from the consumer
71             # requests from workers to add objects to work list
72             # requests from workers to remove objects from work list
73            
74 3805         450480 my $package = $Qp->receive;
75 3805 50       69236 if($package->get_type eq 'another') {
    0          
    0          
76 3805         5026 $another_count++;
77 3805 100       18102 if($another_count == $worker_count) {
78             #We are now done.
79 6         266 $Qc->send(Proc::Swarm::Package->new(undef, 'end'));
80 6         0 exit;
81             }
82            
83 3799 100       15509 if((scalar @work_units) != 0) {
84 3747         48133 _worker(pop(@work_units), $coderef, $Qc);
85 3613         284152 $worker_count++;
86             }
87             } elsif($package->get_type eq 'del') {
88             #find $package->get_object in @work_units and
89             #remove it
90 0         0 my @work_units_tmp;
91             my @new_work_units;
92 0         0 foreach my $work_object (@work_units) {
93 0 0       0 push @new_work_units, $work_object
94             unless $work_object eq $package->get_object;
95             }
96 0         0 undef @work_units;
97 0         0 foreach (@new_work_units) { push @work_units, $_; }
  0         0  
98              
99             } elsif($package->get_type eq 'new') {
100             #add $package->get_object into @work_units
101 0         0 push @work_units, $package->get_object;
102             }
103             }
104             } else { #Parent (consumer)
105 27         273 my @results;
106            
107 27         80 while(1) {
108             #We are expecting messages from the workers here.
109             #For each worker message, we want to record the result
110             #and inform the producer to spawn another worker.
111 236         13191 my $package = $Qc->receive;
112 236 100       2169 if($package->get_type eq 'res') {
    50          
113 209         7610 push @results, $package->get_object;
114             #Tell the producer to spawn another worker.
115 209         2961 $Qp->send(
116             Proc::Swarm::Package->new(undef, 'another'));
117             } elsif($package->get_type eq 'end') {
118             #This is a message from the producer that
119             #it is finished spawning workers.
120              
121             #We will only get this message when we are
122             #sure all of the workers are finished.
123 27 100       148 if(defined($sort_output)) {
124 23         309 @results = _sort_results($sort_code, \@results, \@units);
125             }
126 27         148 $Qc->cleanup;
127 27         3970 $Qp->cleanup;
128 27         364 return Proc::Swarm::Results->new(@results);
129             }
130             }
131             }
132             }
133              
134             sub _sort_results {
135 23     23   69 my ($sort_code,$results_ref,$units_ref) = @_;
136              
137 23         229 my @units = @$units_ref;
138 23         70 my @results = @$results_ref;
139 23         68 my %sort_hash;
140 23         24 { my $i = 0;
  23         24  
141 23         161 %sort_hash = map { $units[$i], $i++ } @units;
  91         953  
142             }
143              
144 23 50       293 $sort_code = q(
145             sub { $sort_hash{$a->get_object}
146             <=>
147             $sort_hash{$b->get_object}
148             };
149             ) unless defined $sort_code;
150              
151 23         8991 my $sort_coderef = eval $sort_code;
152              
153 23         987 @results = sort $sort_coderef @results;
154 23         301 return @results;
155             }
156              
157             #this function should immediately return.
158             sub _worker {
159 10407     10407   43725 my ($object,$coderef,$Qc) = @_;
160              
161 10407         20618 my ($Qp,$pid);
162             #the classic double fork.
163 10407 100       20374009 unless ($pid = fork) {
164 250 100       695136 unless (fork) {
165 125         13346 _worker_worker($object, $coderef, $Qc, $Qp);
166 125         0 exit 0;
167             }
168 125         0 exit 0;
169             }
170 10157         8244605913 waitpid $pid,0;
171             }
172              
173             sub _worker_worker {
174 125     125   7109 my ($object,$coderef,$Qc,$Qp) = @_;
175 125         2491 my $start = scalar time;
176 125         10551 my ($retval,$result_type);
177              
178 125         799206 eval {
179 125         12094 $retval = &$coderef($object);
180             };
181 125 100       1010126063 if($@) {
182 2         36 $result_type = 'error';
183 2         7 $retval = $@;
184             } else {
185 123         10101 $result_type = 'good';
186             }
187 125         1506 my $end = scalar time;
188 125         962621 my $result = Proc::Swarm::Result->new(($end-$start), $object, $retval, $result_type);
189 125         5321 my $package = Proc::Swarm::Package->new($result, 'res');
190              
191 125         8842681 $Qc->send($package);
192             }
193              
194             package Proc::Swarm::Package;
195             {
196             $Proc::Swarm::Package::VERSION = '1.133280';
197             }
198              
199             sub new {
200 340     340   3411 my ($proto,$object,$type) = @_;
201              
202              
203 340   33     385710 my $class = ref($proto) || $proto;
204 340         1522 my $self = {};
205 340         5087 $self->{type} = $type;
206 340         1069 $self->{obj} = $object;
207              
208 340         2774 bless $self, $class;
209 340         4537 return $self;
210             }
211              
212             sub get_type {
213 4068     4068   12168 my $self = shift;
214 4068         24204 return $self->{type};
215             }
216              
217             sub get_object {
218 209     209   489 my $self = shift;
219 209         1143 return $self->{obj};
220             }
221             package Proc::Swarm::Results;
222             {
223             $Proc::Swarm::Results::VERSION = '1.133280';
224             }
225              
226             sub new {
227 27     27   91 my $proto = shift;
228 27   33     219 my $class = ref($proto) || $proto;
229              
230 27         93 my @results = @_;
231              
232 27         36 my $self = {};
233 27         263 $self->{results} = \@results;
234 27         84 bless $self, $class;
235 27         2329 return $self;
236             }
237              
238             sub get_result_count {
239 0     0   0 my $self = shift;
240 0 0       0 return $self->{count} if defined $self->{count};
241 0         0 $self->{count} = scalar @{$self->{results}};
  0         0  
242 0         0 return $self->{count};
243             }
244              
245             sub get_result {
246 1     1   25 my $self = shift;
247 1         3 my $object_id = shift;
248              
249 1         5 foreach my $result (@{$self->{results}}) {
  1         15  
250 4 100       12 return $result
251             if $result->get_object eq $object_id;
252             }
253 0         0 return undef;
254             }
255              
256             sub get_result_objects {
257 25     25   1451 my $self = shift;
258 25 50       215 return @{$self->{objects}} if defined $self->{objects};
  0         0  
259              
260 25         28 my @objects;
261 25         61 foreach my $result (@{$self->{results}}) {
  25         260  
262 202         364 push @objects, $result->get_result;
263             }
264 25         247 $self->{objects} = \@objects;
265 25         197 return @objects;
266             }
267              
268             sub get_results {
269 0     0   0 my $self = shift;
270 0         0 return @{$self->{results}};
  0         0  
271             }
272              
273             sub get_result_times {
274 1     1   36 my $self = shift;
275            
276 1 50       9 return @{$self->{times}} if defined $self->{times};
  0         0  
277              
278 1         2 my @times;
279 1         1 foreach my $result (@{$self->{results}}) {
  1         10  
280 3         7 push @times, $result->get_runtime;
281             }
282 1         10 $self->{times} = \@times;
283 1         4 return @times;
284             }
285              
286             sub get_objects {
287 0     0   0 my $self = shift;
288              
289 0         0 my @objects;
290 0         0 foreach my $result (@{$self->{results}}) {
  0         0  
291 0         0 push @objects, $result->get_object;
292             }
293              
294 0         0 return @objects;
295             }
296              
297             package Proc::Swarm::Result;
298             {
299             $Proc::Swarm::Result::VERSION = '1.133280';
300             }
301              
302             sub new {
303 125     125   2357 my $proto = shift;
304 125   33     16805 my $class = ref($proto) || $proto;
305              
306 125         2973 my $self = {};
307 125         8863 ( $self->{runtime},
308             $self->{object},
309             $self->{result},$self->{result_type}
310             ) = @_;
311 125         3043 bless $self, $class;
312 125         3746 return $self;
313             }
314              
315             sub get_runtime {
316 3     3   4 my $self = shift;
317 3         7 return $self->{runtime};
318             }
319             sub
320             get_object {
321 228     228   281 my $self = shift;
322 228         2691 return $self->{object};
323             }
324              
325             sub get_result {
326 202     202   208 my $self = shift;
327 202         608 return $self->{result};
328             }
329              
330             sub get_result_type {
331 1     1   3 my $self = shift;
332 1         5 return $self->{result_type};
333             }
334              
335              
336             package Proc::Swarm::Queue;
337             {
338             $Proc::Swarm::Queue::VERSION = '1.133280';
339             }
340              
341             sub new {
342 566     566   1101 my $proto = shift;
343 566   33     3127 my $class = ref($proto) || $proto;
344              
345 262     262   5264 use IPC::SysV qw(IPC_PRIVATE S_IRWXU);
  262         541  
  262         156691  
346              
347 566         1980 my $self = {};
348              
349 566         7739 $self->{Q} = IPC::Msg->new(IPC_PRIVATE, S_IRWXU);
350              
351 566         29985 bless $self, $class;
352 566         18892 return $self;
353             }
354              
355             #We can't define a DESTROY method because this class goes out of scope a
356             #number of times before we actually want to remove the queues.
357             sub cleanup {
358 54     54   91 my $self = shift;
359 54         420 $self->{Q}->remove;
360             }
361              
362             sub send {
363 340     340   840757 my ($self,$obj) = @_;
364 340         5110 my $frozen_obj = Storable::freeze($obj);
365 340         3311520 return $self->{Q}->snd(1, $frozen_obj); #Message type '1'
366             }
367              
368             sub receive {
369 4041     4041   15659 my $self = shift;
370 4041         8438 my $in_buf;
371 4041         222096 my $thing = $self->{Q}->rcv($in_buf, 10240000);#This grabs any message type.
372 4041         531515392 my $thawed_thing = Storable::thaw $in_buf;
373 4041         324068 return $thawed_thing;
374             }
375              
376             1;
377              
378              
379             __END__