File Coverage

blib/lib/Proc/Swarm.pm
Criterion Covered Total %
statement 160 188 85.1
branch 29 50 58.0
condition 5 15 33.3
subroutine 25 29 86.2
pod 0 1 0.0
total 219 283 77.3


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3              
4             package Proc::Swarm;
5             $Proc::Swarm::VERSION = '1.161060';
6 258     258   118477 use strict;use warnings;
  258     258   305  
  258         5863  
  258         766  
  258         268  
  258         5243  
7 258     258   104953 use IPC::Msg;
  258         1142641  
  258         6295  
8 258     258   132967 use Storable;
  258         572593  
  258         313705  
9              
10             sub _usage {
11 0 0   0   0 print @_ . "\n" if @_;
12 0         0 print q(
13             Proc::Swarm::swarm(
14             code => $coderef,
15             children => $child_count,
16             work => \@work_units,
17             [sort => 1],
18             [debug => 1] );
19             );
20 0         0 exit 255;
21             }
22              
23             sub swarm {
24 277     277 0 165008 my $args = shift;
25 277 50       1400 _usage() if ref $args ne 'HASH';
26 277         366 my $coderef = $args->{code};
27 277         315 my $max_children = $args->{children};
28 277         285 my @units = @{$args->{work}};
  277         1821  
29 277         544 my $sort_output = $args->{sort};
30 277         297 my $sort_code = $args->{sort_code};
31              
32 277         1350 my @work_units = @units;
33 277 50       626 _usage('No work defined') if (scalar @work_units) == 0;
34              
35 277 50       1172 _usage('Invalid code passed') unless ref $coderef eq 'CODE';
36 277 50 33     1937 _usage('Child count argument must be a non-negative, non-zero integer')
37             if $max_children < 1 or $max_children =~ /\./;
38 277 50       562 _usage('Work units must not contain a reference')
39             if ref $work_units[0];
40              
41             #We now have something like clean arguments.
42              
43             #We need two message queues. One that the producer listens to, and
44             #another the consumer listens to.
45              
46 277         592 my $Qc = Proc::Swarm::Queue->new; #consumer
47 277         554 my $Qp = Proc::Swarm::Queue->new; #producer
48              
49             #The main parent is the consumer. It will exit last.
50             #The first child is the producer.
51 277         162814 my $pid = fork();
52 277 50       9515 if(not defined $pid) { #fork failed
    100          
53 0         0 die 'Fork failed. Check your system resources.';
54             } elsif(not $pid) { #Child (producer)
55 252         2264 my $worker_count = 0;
56 252         1401 my $another_count = 0;
57             #first we spin off enough children to max out the count.
58 252         7268 for (1..$max_children) {
59 6630         114175 _worker(pop @work_units, $coderef, $Qc);
60 6518         170725 $worker_count++;
61             }
62              
63             #Now we should have $max_children processes. Wait for them
64             #to finish.
65 140         2094 while(1) {
66             #We are expecting one of:
67             # requests to spawn another worker from the consumer
68             # requests from workers to add objects to work list
69             # requests from workers to remove objects from work list
70            
71 3803         43381 my $package = $Qp->receive;
72 3803 50       10924 if($package->get_type eq 'another') {
    0          
    0          
73 3803         5252 $another_count++;
74 3803 100       17702 if($another_count == $worker_count) {
75             #We are now done.
76 6         113 $Qc->send(Proc::Swarm::Package->new(undef, 'end'));
77 6         1645 exit;
78             }
79            
80 3797 100       11093 if((scalar @work_units) != 0) {
81 3747         21300 _worker(pop(@work_units), $coderef, $Qc);
82 3613         183377 $worker_count++;
83             }
84             } elsif($package->get_type eq 'del') {
85             #find $package->get_object in @work_units and
86             #remove it
87 0         0 my @work_units_tmp;
88             my @new_work_units;
89 0         0 foreach my $work_object (@work_units) {
90 0 0       0 push @new_work_units, $work_object
91             unless $work_object eq $package->get_object;
92             }
93 0         0 undef @work_units;
94 0         0 foreach (@new_work_units) { push @work_units, $_; }
  0         0  
95              
96             } elsif($package->get_type eq 'new') {
97             #add $package->get_object into @work_units
98 0         0 push @work_units, $package->get_object;
99             }
100             }
101             } else { #Parent (consumer)
102 25         305 my @results;
103            
104 25         98 while(1) {
105             #We are expecting messages from the workers here.
106             #For each worker message, we want to record the result
107             #and inform the producer to spawn another worker.
108 205         5824 my $package = $Qc->receive;
109 205 100       1037 if($package->get_type eq 'res') {
    50          
110 180         667 push @results, $package->get_object;
111             #Tell the producer to spawn another worker.
112 180         2669 $Qp->send(
113             Proc::Swarm::Package->new(undef, 'another'));
114             } elsif($package->get_type eq 'end') {
115             #This is a message from the producer that
116             #it is finished spawning workers.
117              
118             #We will only get this message when we are
119             #sure all of the workers are finished.
120 25 100       226 if(defined($sort_output)) {
121 21         316 @results = _sort_results($sort_code, \@results, \@units);
122             }
123 25         154 $Qc->cleanup;
124 25         2174 $Qp->cleanup;
125 25         341 return Proc::Swarm::Results->new(@results);
126             }
127             }
128             }
129             }
130              
131             sub _sort_results {
132 21     21   63 my ($sort_code,$results_ref,$units_ref) = @_;
133              
134 21         102 my @units = @$units_ref;
135 21         43 my @results = @$results_ref;
136 21         41 my %sort_hash;
137 21         42 { my $i = 0;
  21         22  
138 21         106 %sort_hash = map { $units[$i], $i++ } @units;
  63         384  
139             }
140              
141 21 50       168 $sort_code = q(
142             sub { $sort_hash{$a->get_object}
143             <=>
144             $sort_hash{$b->get_object}
145             };
146             ) unless defined $sort_code;
147              
148 21         4427 my $sort_coderef = eval $sort_code;
149              
150 21         804 @results = sort $sort_coderef @results;
151 21         289 return @results;
152             }
153              
154             #this function should immediately return.
155             sub _worker {
156 10377     10377   26976 my ($object,$coderef,$Qc) = @_;
157              
158 10377         11014 my ($Qp,$pid);
159             #the classic double fork.
160 10377 100       5544038 unless ($pid = fork) {
161 246 100       144421 unless (fork) {
162 123         4555 _worker_worker($object, $coderef, $Qc, $Qp);
163 123         46014 exit 0;
164             }
165 123         43990 exit 0;
166             }
167 10131         2645360592 waitpid $pid,0;
168             }
169              
170             sub _worker_worker {
171 123     123   1960 my ($object,$coderef,$Qc,$Qp) = @_;
172 123         1125 my $start = scalar time;
173 123         885 my ($retval,$result_type);
174              
175 123         5848 eval {
176 123         4342 $retval = &$coderef($object);
177             };
178 123 100       1009060599 if($@) {
179 2         16 $result_type = 'error';
180 2         8 $retval = $@;
181             } else {
182 121         1916 $result_type = 'good';
183             }
184 123         945 my $end = scalar time;
185 123         3745 my $result = Proc::Swarm::Result->new(($end-$start), $object, $retval, $result_type);
186 123         1165 my $package = Proc::Swarm::Package->new($result, 'res');
187              
188 123         3321 $Qc->send($package);
189             }
190              
191             package Proc::Swarm::Package;
192             $Proc::Swarm::Package::VERSION = '1.161060';
193             sub new {
194 309     309   1652 my ($proto,$object,$type) = @_;
195              
196              
197 309   33     3548 my $class = ref($proto) || $proto;
198 309         1054 my $self = {};
199 309         1617 $self->{type} = $type;
200 309         617 $self->{obj} = $object;
201              
202 309         889 bless $self, $class;
203 309         1196 return $self;
204             }
205              
206             sub get_type {
207 4033     4033   5405 my $self = shift;
208 4033         15338 return $self->{type};
209             }
210              
211             sub get_object {
212 180     180   378 my $self = shift;
213 180         550 return $self->{obj};
214             }
215             package Proc::Swarm::Results;
216             $Proc::Swarm::Results::VERSION = '1.161060';
217             sub new {
218 25     25   31 my $proto = shift;
219 25   33     188 my $class = ref($proto) || $proto;
220              
221 25         87 my @results = @_;
222              
223 25         51 my $self = {};
224 25         102 $self->{results} = \@results;
225 25         33 bless $self, $class;
226 25         825 return $self;
227             }
228              
229             sub get_result_count {
230 0     0   0 my $self = shift;
231 0 0       0 return $self->{count} if defined $self->{count};
232 0         0 $self->{count} = scalar @{$self->{results}};
  0         0  
233 0         0 return $self->{count};
234             }
235              
236             sub get_result {
237 1     1   12 my $self = shift;
238 1         1 my $object_id = shift;
239              
240 1         2 foreach my $result (@{$self->{results}}) {
  1         8  
241 4 100       9 return $result
242             if $result->get_object eq $object_id;
243             }
244 0         0 return undef;
245             }
246              
247             sub get_result_objects {
248 23     23   496 my $self = shift;
249 23 50       184 return @{$self->{objects}} if defined $self->{objects};
  0         0  
250              
251 23         24 my @objects;
252 23         25 foreach my $result (@{$self->{results}}) {
  23         181  
253 173         244 push @objects, $result->get_result;
254             }
255 23         81 $self->{objects} = \@objects;
256 23         129 return @objects;
257             }
258              
259             sub get_results {
260 0     0   0 my $self = shift;
261 0         0 return @{$self->{results}};
  0         0  
262             }
263              
264             sub get_result_times {
265 1     1   25 my $self = shift;
266            
267 1 50       8 return @{$self->{times}} if defined $self->{times};
  0         0  
268              
269 1         1 my @times;
270 1         2 foreach my $result (@{$self->{results}}) {
  1         7  
271 3         5 push @times, $result->get_runtime;
272             }
273 1         8 $self->{times} = \@times;
274 1         11 return @times;
275             }
276              
277             sub get_objects {
278 0     0   0 my $self = shift;
279              
280 0         0 my @objects;
281 0         0 foreach my $result (@{$self->{results}}) {
  0         0  
282 0         0 push @objects, $result->get_object;
283             }
284              
285 0         0 return @objects;
286             }
287              
288             package Proc::Swarm::Result;
289             $Proc::Swarm::Result::VERSION = '1.161060';
290             sub new {
291 123     123   379 my $proto = shift;
292 123   33     3609 my $class = ref($proto) || $proto;
293              
294 123         938 my $self = {};
295             ( $self->{runtime},
296             $self->{object},
297             $self->{result},$self->{result_type}
298 123         4103 ) = @_;
299 123         835 bless $self, $class;
300 123         982 return $self;
301             }
302              
303             sub get_runtime {
304 3     3   3 my $self = shift;
305 3         4 return $self->{runtime};
306             }
307             sub
308             get_object {
309 128     128   148 my $self = shift;
310 128         1128 return $self->{object};
311             }
312              
313             sub get_result {
314 173     173   168 my $self = shift;
315 173         273 return $self->{result};
316             }
317              
318             sub get_result_type {
319 1     1   1 my $self = shift;
320 1         16 return $self->{result_type};
321             }
322              
323              
324             package Proc::Swarm::Queue;
325             $Proc::Swarm::Queue::VERSION = '1.161060';
326             sub new {
327 554     554   589 my $proto = shift;
328 554   33     1688 my $class = ref($proto) || $proto;
329              
330 258     258   1327 use IPC::SysV qw(IPC_PRIVATE S_IRWXU);
  258         285  
  258         43194  
331              
332 554         581 my $self = {};
333              
334 554         1666 $self->{Q} = IPC::Msg->new(IPC_PRIVATE, S_IRWXU);
335              
336 554         15221 bless $self, $class;
337 554         599 return $self;
338             }
339              
340             #We can't define a DESTROY method because this class goes out of scope a
341             #number of times before we actually want to remove the queues.
342             sub cleanup {
343 50     50   89 my $self = shift;
344 50         313 $self->{Q}->remove;
345             }
346              
347             sub send {
348 309     309   563 my ($self,$obj) = @_;
349 309         2440 my $frozen_obj = Storable::freeze($obj);
350 309         46080 return $self->{Q}->snd(1, $frozen_obj); #Message type '1'
351             }
352              
353             sub receive {
354 4008     4008   8574 my $self = shift;
355 4008         4283 my $in_buf;
356 4008         68283 my $thing = $self->{Q}->rcv($in_buf, 10240000);#This grabs any message type.
357 4008         476873358 my $thawed_thing = Storable::thaw $in_buf;
358 4008         134378 return $thawed_thing;
359             }
360              
361             1;
362              
363              
364             __END__