File Coverage

blib/lib/GRID/Cluster.pm
Criterion Covered Total %
statement 24 233 10.3
branch 0 42 0.0
condition 0 27 0.0
subroutine 8 22 36.3
pod 6 14 42.8
total 38 338 11.2


line stmt bran cond sub pod time code
1             package GRID::Cluster;
2              
3 1     1   29879 use strict;
  1         2  
  1         39  
4 1     1   4 use warnings;
  1         2  
  1         34  
5              
6 1     1   1400 use GRID::Machine;
  1         250177  
  1         67  
7 1     1   531 use GRID::Cluster::Result;
  1         3  
  1         30  
8 1     1   569 use GRID::Cluster::Handle;
  1         3  
  1         23  
9 1     1   12 use IO::Select;
  1         2  
  1         35  
10 1     1   6 use List::Util qw(sum);
  1         2  
  1         69  
11              
12             # Size of the buffer which is used to read from
13             # remote processes
14 1     1   5 use constant BUFFERSIZE => 2048;
  1         1  
  1         2376  
15              
16             our $VERSION = '0.04';
17              
18             # Constructor
19              
20             sub new {
21 0     0 1   my $class = shift;
22 0           my %opts = @_;
23              
24 0           my $self;
25 0 0 0       if (exists($opts{config}) && -r $opts{config}) {
26 0           %$self = do $opts{config};
27             }
28             else {
29 0           $self = {
30             debug => $opts{debug},
31             max_num_np => $opts{max_num_np},
32             hosts => {},
33             };
34             }
35              
36 0           my @proposed_names = keys %{$self->{max_num_np}};
  0            
37 0           $self->{host_names} = [];
38              
39            
40              
41 0           my $logic_id = 0;
42            
43 0           for (@proposed_names) {
44 0           eval{
45 0           $self->{hosts}{$_} = GRID::Machine->new (
46             host => $_,
47             debug => $self->{debug}{$_},
48             logic_id => $logic_id,
49             );
50             };
51              
52 0 0         if ($@) {
53 0           warn "Warning: Host $_ has not been initialized: $@";
54 0           delete $self->{debug}{$_};
55 0           delete $self->{max_num_np}{$_};
56 0           delete $self->{hosts}{$_};
57             }
58             else {
59 0           push @{$self->{host_names}}, $_;
  0            
60 0           $logic_id++;
61             }
62             }
63              
64 0           bless $self, $class;
65 0 0         return $self if (@{$self->{host_names}});
  0            
66 0           return undef;
67             }
68              
69             # Getters
70              
71             sub get_host {
72 0     0 0   my ($self, $host_name) = @_;
73 0 0         return $self->{hosts}{$host_name} if defined ($self->{hosts}{$host_name});
74             }
75              
76             sub get_max_np {
77 0     0 0   my $self = shift;
78 0           return sum values %{$self->{max_num_np}};
  0            
79             }
80              
81             sub get_num_machines {
82 0     0 0   my $self = shift;
83 0           my $num = @{$self->{host_names}};
  0            
84 0           return $num;
85             }
86              
87             sub get_machine_names {
88 0     0 0   my $self = shift;
89 0           return keys %{$self->{max_num_np}};
  0            
90             }
91              
92             # Methods
93              
94             sub copyandmake {
95 0     0 1   my $self = shift;
96 0           my %opts = @_;
97              
98 0           my $r = GRID::Cluster::Result->new();
99              
100 0           for (@{$self->{host_names}}) {
  0            
101 0           my $machine_result = $self->{hosts}{$_}->copyandmake(%opts);
102 0           $r->add(host_name => $_, machine_result => $machine_result);
103             }
104              
105 0           return $r;
106             }
107              
108             sub chdir {
109 0     0 1   my ($self, $dir) = @_;
110              
111 0           my $r = GRID::Cluster::Result->new();
112              
113 0           for (@{$self->{host_names}}) {
  0            
114 0           my $machine_result = $self->{hosts}{$_}->chdir($dir);
115 0           $r->add(host_name => $_, machine_result => $machine_result);
116             }
117              
118 0           return $r;
119             }
120              
121             sub modput {
122 0     0 1   my $self = shift;
123 0           my @modules = @_;
124              
125 0           my $r = GRID::Cluster::Result->new();
126              
127 0           for (@{$self->{host_names}}) {
  0            
128 0           my $machine_result = $self->{hosts}{$_}->modput(@modules);
129 0           $r->add(host_name => $_, machine_result => $machine_result);
130             }
131              
132 0           return $r;
133             }
134              
135             sub eval {
136 0     0 1   my $self = shift;
137 0           my ($code, @args) = @_;
138              
139 0           my $r = GRID::Cluster::Result->new();
140              
141             # First round: Send eval operations to each machine
142 0           for (@{$self->{host_names}}) {
  0            
143 0           $self->{hosts}{$_}->send_operation("GRID::Machine::EVAL", $code, \@args);
144             }
145              
146             # Second round: Receive different results from each machine
147 0           for (@{$self->{host_names}}) {
  0            
148 0           my $machine_result = $self->{hosts}{$_}->_get_result();
149 0           $r->add(host_name => $_, machine_result => $machine_result);
150             }
151              
152 0           return $r;
153             }
154              
155             sub qx {
156 0     0 1   my $self = shift;
157 0           my @commands = map { "$_ | " } @_;
  0            
158              
159 0           my @proc;
160             my @pid;
161 0           my %map_id_machine;
162 0           my %id;
163              
164 0           my $counter = 0;
165            
166 0           my $np = @commands;
167 0           my $lp = $np - 1;
168 0           my $readset = IO::Select->new();
169              
170 0           for (@{$self->{host_names}}) {
  0            
171 0           for my $actual_proc (0 .. $self->{max_num_np}{$_} - 1) {
172 0           my $m = $self->get_host($_);
173 0           ($proc[$counter], $pid[$counter]) = $m->open(shift @commands);
174 0           $proc[$counter]->blocking(1);
175              
176 0           $map_id_machine{$counter} = $_;
177 0           $readset->add($proc[$counter]);
178 0           my $address = 0 + $proc[$counter];
179 0           $id{$address} = $counter;
180              
181 0           $counter++;
182              
183             # See if all workers are busy, if so wait for one to finish
184 0 0 0       last if (($counter > $lp) || ($counter >= $self->get_max_np()));
185             }
186 0 0 0       last if (($counter > $lp) || ($counter >= $self->get_max_np()));
187             }
188            
189 0           my $count = 0;
190 0           my @ready;
191             my @result;
192              
193 0           do {
194 0 0         push @ready, $readset->can_read unless @ready;
195              
196 0           my $handle = shift @ready;
197              
198 0           my $me = $id{0 + $handle};
199              
200 0           my ($aux, $bytes, $r);
201              
202 0   0       while ((!defined($bytes)) || ($bytes)) {
203 0           $bytes = sysread($handle, $aux, BUFFERSIZE);
204 0 0 0       $r .= $aux if ((defined($bytes)) && ($bytes));
205             }
206              
207 0           $result[$me] = $r;
208              
209 0 0         $readset->remove($handle) if eof($handle);
210              
211 0           close $handle;
212              
213 0 0         if (@commands) {
214 0           my $m = $self->get_host($map_id_machine{$me});
215 0           ($proc[$counter], $pid[$counter]) = $m->open(shift @commands);
216 0           $proc[$counter]->blocking(1);
217              
218 0           $map_id_machine{$counter} = $map_id_machine{$me};
219 0           $readset->add($proc[$counter]);
220 0           my $address = 0 + $proc[$counter];
221 0           $id{$address} = $counter;
222              
223 0           $counter++;
224             }
225              
226             } until (++$count == $np);
227              
228 0           my @results;
229 0           my $i = 0;
230              
231 0 0         if (wantarray) {
232 0           foreach (@result) {
233 0           $results[$i] = [ split /\n/, $_ ];
234 0           $i++;
235             }
236 0           return @results;
237             }
238 0           return \@result;
239             }
240              
241             sub open {
242 0     0 0   my ($self, @command) = @_;
243              
244 0           my @proc;
245             my @pid;
246 0           my %map_id_machine;
247 0           my %id;
248              
249 0           my $counter = 0;
250 0           my $np = @command;
251 0           my $lp = $np - 1;
252 0           my $readset = IO::Select->new();
253              
254 0           for (@{$self->{host_names}}) {
  0            
255 0           for my $actual_proc (0 .. $self->{max_num_np}{$_} - 1) {
256 0           my $m = $self->get_host($_);
257 0           ($proc[$counter], $pid[$counter]) = $m->open($command[$counter]);
258 0           $proc[$counter]->blocking(1);
259              
260 0           $map_id_machine{$counter} = $_;
261 0           $readset->add($proc[$counter]);
262 0           my $address = 0 + $proc[$counter];
263 0           $id{$address} = $counter;
264              
265 0 0         last if (++$counter > $lp);
266             }
267 0 0         last if ($counter > $lp);
268             }
269              
270 0           return GRID::Cluster::Handle->new (
271             readset => $readset,
272             proc => \@proc,
273             pid => \@pid,
274             id => \%id,
275             map_id_machine => \%map_id_machine
276             );
277             }
278              
279             sub open2 {
280 0     0 0   my ($self, @command) = @_;
281              
282 0           my @rproc;
283             my @wproc;
284 0           my @pid;
285 0           my %map_id_machine;
286 0           my %id;
287              
288 0           my $counter = 0;
289 0           my $np = @command;
290 0           my $lp = $np - 1;
291 0           my $readset = IO::Select->new();
292              
293 0           for (@{$self->{host_names}}) {
  0            
294 0           for my $actual_proc (0 .. $self->{max_num_np}{$_} - 1) {
295 0           my $m = $self->get_host($_);
296 0           $wproc[$counter] = IO::Handle->new();
297 0           $rproc[$counter] = IO::Handle->new();
298 0           $pid[$counter] = $m->open2($rproc[$counter], $wproc[$counter], $command[$counter]);
299              
300 0           $map_id_machine{$counter} = $_;
301 0           $readset->add($rproc[$counter]);
302 0           my $address = 0 + $rproc[$counter];
303 0           $id{$address} = $counter;
304              
305 0 0         last if (++$counter > $lp);
306             }
307 0 0         last if ($counter > $lp);
308             }
309              
310 0           return GRID::Cluster::Handle->new (
311             readset => $readset,
312             rproc => \@rproc,
313             wproc => \@wproc,
314             pid => \@pid,
315             id => \%id,
316             map_id_machine => \%map_id_machine
317             );
318             }
319              
320             sub close {
321 0     0 0   my ($self, $cluster_handle) = @_;
322              
323 0           my $readset = $cluster_handle->get_readset();
324 0           my %id = %{$cluster_handle->get_id()};
  0            
325 0           my $np = values(%id);
326            
327 0           my $count = 0;
328 0           my @ready;
329             my @result;
330              
331 0           do {
332 0 0         push @ready, $readset->can_read unless @ready;
333              
334 0           my $handle = shift @ready;
335              
336 0           my $me = $id{0 + $handle};
337              
338 0           my ($aux, $bytes, $r);
339              
340 0   0       while ((!defined($bytes)) || ($bytes)) {
341 0           $bytes = sysread($handle, $aux, BUFFERSIZE);
342 0 0 0       $r .= $aux if ((defined($bytes)) && ($bytes));
343             }
344              
345 0           $result[$me] = $r;
346              
347 0 0         $readset->remove($handle) if eof($handle);
348              
349 0           close $handle;
350              
351             } until (++$count == $np);
352              
353 0           return \@result;
354             }
355              
356             sub close2 {
357 0     0 0   my ($self, $cluster_handle) = @_;
358              
359 0           my $readset = $cluster_handle->get_readset();
360 0           my @wproc = @{$cluster_handle->get_wproc()};
  0            
361 0           my %id = %{$cluster_handle->get_id()};
  0            
362 0           my $np = values(%id);
363              
364 0           my $count = 0;
365 0           my @ready;
366             my @result;
367              
368 0           do {
369 0 0         push @ready, $readset->can_read unless @ready;
370              
371 0           my $handle = shift @ready;
372              
373 0           my $me = $id{0 + $handle};
374              
375 0           my ($aux, $bytes, $r);
376              
377 0   0       while ((!defined($bytes)) || ($bytes)) {
378 0           $bytes = sysread($handle, $aux, BUFFERSIZE);
379 0 0 0       $r .= $aux if ((defined($bytes)) && ($bytes));
380             }
381              
382 0           $result[$me] = $r;
383              
384 0 0         $readset->remove($handle) if eof($handle);
385              
386 0           $handle->close();
387 0           $wproc[$count]->close();
388              
389             } until (++$count == $np);
390              
391 0           return \@result;
392             }
393              
394             1;
395             __END__