File Coverage

blib/lib/GRID/Machine/Group.pm
Criterion Covered Total %
statement 27 152 17.7
branch 0 38 0.0
condition 0 12 0.0
subroutine 9 23 39.1
pod 6 7 85.7
total 42 232 18.1


line stmt bran cond sub pod time code
1             package GRID::Machine::Group;
2 2     2   11493 use warnings;
  2         6  
  2         71  
3 2     2   11 use strict;
  2         6  
  2         67  
4 2     2   10 use List::Util qw(first);
  2         3  
  2         199  
5 2     2   14 use Scalar::Util qw(reftype);
  2         4  
  2         82  
6 2     2   1716 use IO::Select;
  2         3308  
  2         97  
7 2     2   13 use base qw{Exporter};
  2         3  
  2         698  
8              
9             our @EXPORT_OK = qw{void};
10              
11             sub new {
12 0     0 1   my $class = shift;
13 0           my %args = @_;
14              
15 0           my @machines = @{$args{cluster}};
  0            
16 0 0         @machines = map { ref($_)? $_ : GRID::Machine->new(host => $_, survive => 1) } @machines;
  0            
17              
18            
19 0           my $s = IO::Select->new();
20 0           my %rpipe2gm = map { (0+$_->readpipe, $_) } @machines;
  0            
21 0           my %wpipe2gm = map { (0+$_->writepipe, $_) } @machines;
  0            
22 0           for (@machines) {
23 0           $s->add($_->readpipe);
24 0           $s->add($_->writepipe);
25             }
26              
27 0           my $self = {
28             machines => [ @machines ],
29             select => $s,
30            
31             rpipe => \%rpipe2gm,
32             wpipe => \%wpipe2gm, # keys: write pipe addresses. Values: GRID machines
33             };
34            
35 0           my $clusterclass = "$class"."::".(0+$self);
36              
37 0           bless $self, $clusterclass;
38              
39 0           my $misa;
40             {
41 2     2   11 no strict 'refs';
  2         3  
  2         2305  
  0            
42 0           $misa = \@{"${clusterclass}::ISA"};
  0            
43             }
44              
45 0           unshift @{$misa}, 'GRID::Machine::Group'
46 0 0   0     unless first { $_ eq 'GRID::Machine::Group' } @{$misa};
  0            
  0            
47              
48 0           $self;
49             }
50              
51             sub call {
52 0     0 1   calloreval('GRID::Machine::CALL', @_);
53             }
54              
55             sub eval {
56 0     0 1   calloreval('GRID::Machine::EVAL', @_);
57             }
58              
59             sub calloreval {
60 0     0 0   my $protocol = shift;
61 0           my $self = shift;
62 0           my $name = shift;
63 0           my %ARG = @_;
64              
65 0           my $arg = $ARG{args};
66              
67 0           my ($next, $thereareargs, $reset);
68              
69 0 0         unless (@{$self->{machines}}) {
  0            
70 0           warn "Warning! Attempt to execute '$name' in an empty cluster!";
71 0           return;
72             }
73              
74             # replicate is ignored if 'arg' is defined
75 0 0         unless (defined($arg)) {
76 0           my $rep = $ARG{replicate};
77 0           my $rt = reftype($rep);
78 0 0         die "GRID::Machine::Group::call error. Unexpected arguments" unless $rt;
79 0 0         if ($rt eq 'ARRAY') {
    0          
80 0           push @$arg, $rep for @{$self->{machines}};
  0            
81             }
82             elsif ($rt eq 'CODE') {
83 0           for ( @{$self->{machines}}) {
  0            
84 0           my $r = $rep->($_);
85 0 0 0       $r = [ $r ] unless reftype($r) and (reftype($r) eq 'ARRAY');
86 0           push @$arg, $r;
87             }
88             }
89             else {
90 0           die "GRID::Machine::Group::call error. Unexpected arguments";
91             }
92             }
93              
94 0           my $rt = reftype($arg);
95 0 0         if ($rt) {
96 0 0         if ($rt eq 'ARRAY') {
    0          
97 0           my @args = @$arg;
98 0     0     $next = sub { shift @args };
  0            
99 0 0   0     $thereareargs = sub { @args ? 1 : 0 };
  0            
100 0     0     $reset = sub {};
  0            
101             }
102             elsif ($rt eq 'HASH') {
103 0           $next = $arg->{next};
104 0           $thereareargs = $arg->{thereareargs};
105 0           $reset = $arg->{reset};
106             }
107             else {
108 0           die "GRID::Machine::Group::call error. Unexpected arguments";
109             }
110             }
111             else { # not a ref
112 0           die "GRID::Machine::Group::call error. Unexpected arguments";
113             }
114              
115 0           my %t;
116 0           my $task = 0;
117 0           $reset->();
118 0           for (@{$self->{machines}}) {
  0            
119 0           my ($args) = $next->(); # shift @_;
120 0 0 0       $args = [ $args] unless (ref($args) and (reftype($args) eq 'ARRAY'));
121              
122 0           $_->send_operation( $protocol, $name, $args );
123 0           $t{0+$_} = $task++;
124              
125 0 0         last unless $thereareargs->(); # @_; # Number of jobs is less than the number of machines
126             }
127              
128 0           my $readset = $self->{select};
129              
130 0           my @ready;
131             my @result;
132 0           my $finished = 0;
133 0   0       do {
134 0 0         push @ready, $readset->can_read unless @ready;
135 0           my $handle = shift @ready;
136              
137 0           my $me = $self->{rpipe}{0+$handle};
138              
139 0           my $index = $t{0+$me};
140 0           $result[$index] = $me->_get_result();
141 0           $finished++;
142              
143 0 0         if ($thereareargs->()) {
144 0           my ($args) = $next->(\@result, $index);
145 0 0 0       $args = [ $args] unless (ref($args) and (reftype($args) eq 'ARRAY'));
146              
147 0           $t{0+$me} = $task++;
148 0           $me->send_operation( $protocol, $name, $args );
149             }
150             #print "Tasks left = '@_' Task = $task, finished = $finished\n";
151            
152             } while ($thereareargs->() or ($finished < $task));
153 0           $reset->();
154              
155 0           return bless \@result, 'GRID::Machine::Group::Result';
156             }
157              
158             sub sub {
159 0     0 1   my $self = shift;
160              
161 0 0         warn "Warning!: Attempt to install sub '$_[0]' in an empty cluster" unless @{$self->{machines}};
  0            
162 0           my @r;
163 0           push @r, $_->sub(@_) for @{$self->{machines}};
  0            
164              
165             #install the par method proxy
166 0           my $name = shift;
167 0     0     my $sub = sub { my $self = shift; $self->call( $name, @_ ) };
  0            
  0            
168            
169 0           my $class = ref($self);
170 2     2   14 no strict 'refs';
  2         5  
  2         398  
171 0           *{$class."::".$name} = $sub;
  0            
172              
173 0           return @r;
174             }
175              
176             sub makemethod {
177 0     0 1   my $self = shift;
178              
179 0 0         warn "Warning!: Attempt to install makemethod '$_[0]' in an empty cluster" unless @{$self->{machines}};
  0            
180 0           my @r;
181 0           push @r, $_->makemethod(@_) for @{$self->{machines}};
  0            
182              
183             #install the par method proxy
184 0           my $name = shift;
185 0     0     my $sub = sub { my $self = shift; $self->call( $name, @_ ) };
  0            
  0            
186            
187 0           my $class = ref($self);
188 2     2   11 no strict 'refs';
  2         5  
  2         291  
189 0           *{$class."::".$name} = $sub;
  0            
190              
191 0           return @r;
192             }
193              
194 0     0 1   sub void { return (replicate => []) }
195              
196             package GRID::Machine::Group::Result;
197              
198             sub Results {
199 0     0     my $self = shift;
200              
201 0           return map { $_->result } @$self;
  0            
202             }
203              
204             1;