File Coverage

blib/lib/Net/ParSCP.pm
Criterion Covered Total %
statement 15 143 10.4
branch 0 60 0.0
condition 0 10 0.0
subroutine 5 15 33.3
pod 1 9 11.1
total 21 237 8.8


line stmt bran cond sub pod time code
1             package Net::ParSCP;
2 2     2   57018 use strict;
  2         5  
  2         75  
3 2     2   8 use warnings;
  2         4  
  2         54  
4              
5 2     2   5277 use IO::Select;
  2         4079  
  2         119  
6 2     2   3072 use Pod::Usage;
  2         128575  
  2         327  
7 2     2   1742 use Net::HostLanguage;
  2         8  
  2         3466  
8              
9             require Exporter;
10              
11             our @ISA = qw(Exporter);
12             our @EXPORT = qw(
13             parpush
14             exec_cssh
15             help
16             version
17             usage
18             $VERBOSE
19             $DRYRUN
20             );
21              
22             our $VERSION = '0.15';
23             our $DRYRUN = 0;
24              
25             ############################################################
26             sub version {
27 0     0 0   my $errmsg = shift;
28              
29 0           print "Version: $VERSION\n";
30 0           pod2usage(
31             -verbose => 99,
32             -sections => "AUTHOR|COPYRIGHT AND LICENSE",
33             -exitval => 0,
34             );
35             }
36              
37              
38             ############################################################
39             sub usage {
40 0     0 0   my $errmsg = shift;
41              
42 0           warn "$errmsg\n";
43 0           pod2usage(
44             -verbose => 99,
45             -sections => "NAME|SYNOPSIS|OPTIONS",
46             -exitval => 1,
47             );
48             }
49              
50             sub help {
51 0     0 0   pod2usage(
52             -verbose => 99,
53             -sections => "NAME|SYNOPSIS|OPTIONS",
54             -exitval => 0,
55             );
56             }
57              
58              
59             sub exec_cssh {
60 0     0 0   my @machines = @_;
61              
62 0           my $csshcommand = 'cssh ';
63 0           $csshcommand .= "$_ " for @machines;
64 0 0         warn "Executing system command:\n\t$csshcommand\n" if $VERBOSE;
65 0           my $pid;
66 0           exec("$csshcommand &");
67 0           die "Can't execute cssh\n";
68             }
69              
70             sub wait_for_answers {
71 0     0 0   my $readset = shift;
72 0           my %proc = %{shift()};
  0            
73              
74 0           my $np = keys %proc; # number of processes
75 0           my %output;
76             my @ready;
77              
78 0           my %result;
79 0           for (my $count = 0; $count < $np; ) {
80 0 0         push @ready, $readset->can_read unless @ready;
81 0           my $handle = shift @ready;
82              
83 0           my $name = $proc{0+$handle};
84              
85 0 0 0       unless (defined($name) && $name) {
86 0           warn "Error. Received message from unknown handle\n";
87 0           $name = 'unknown';
88             }
89              
90 0           my $partial = '';
91 0           my $numBytesRead;
92 0           $numBytesRead = sysread($handle, $partial, 65535, length($partial));
93              
94 0           $output{$name} .= $partial;
95              
96 0 0 0       if (defined($numBytesRead) && !$numBytesRead) {
97             # eof
98 0 0         if ($VERBOSE) {
99 0           print "$name output:\n";
100 0 0         $output{$name} =~ s/^/$name:/gm if length($output{$name});
101 0           print "$output{$name}\n";
102             }
103 0           $readset->remove($handle);
104 0           $count ++;
105 0 0         if (close($handle)) {
106 0           $result{$name} = 1;
107             }
108             else {
109 0 0         warn $! ? "Error closing scp to $name $!\n"
110             : "Exit status $? from scp to $name\n";
111 0 0         print "$output{$name}\n" unless $VERBOSE;
112 0           $result{$name} = 0;
113             }
114             }
115             }
116 0           return \%result;
117             }
118              
119             # parse_sourcefile: Find out what source machines are involved
120             # A hash %source is returned. Keys are the source machines.
121             # Values are the list of source paths
122             # machine => [ paths ]
123             # The special key '' (emtpy string) represents the local machine
124             {
125             my $nowhitenocolons = '(?:[^\s:]|\\\s)+'; # escaped spaces are allowed
126              
127             sub parse_sourcefile {
128 0     0 0   my $sourcefile = shift;
129              
130 0           my @externalmachines = $sourcefile =~ /($nowhitenocolons):($nowhitenocolons)/g;
131 0           my @localpaths = $sourcefile =~ /(?:^|\s) # begin or space
132             ($nowhitenocolons)
133             (?:\s|$) # end or space
134             /xg;
135            
136 0           my %source;
137 0 0         $source{''} = \@localpaths if @localpaths; # '' is the local machine
138 0           while (my ($clusterexp, $path) = splice(@externalmachines, 0, 2)) {
139 0 0         if (exists $source{$clusterexp} ) {
140 0           push @{$source{$clusterexp}}, $path;
  0            
141             }
142             else {
143 0           $source{$clusterexp} = [ $path ]
144             }
145             }
146 0           return %source;
147             }
148             }
149              
150             # Gives the same value for entries $entry1 and $entry2
151             # in the hash referenced by $rh
152             sub make_synonymous {
153 0     0 0   my ($rh, $entry1, $entry2, $defaultvalue) = @_;
154              
155 0 0         if (exists $rh->{$entry1}) {
    0          
156 0           $rh->{$entry2} = $rh->{$entry1}
157             }
158             elsif (exists $rh->{$entry2}) {
159 0           $rh->{$entry1} = $rh->{$entry2};
160             }
161             else {
162 0           $rh->{$entry1} = $rh->{$entry2} = $defaultvalue;
163             }
164             }
165              
166              
167             sub spawn_secure_copies {
168 0     0 0   my %arg = @_;
169 0           my $readset = $arg{readset};
170 0           my $configfile = $arg{configfile};
171 0           my $destination = $arg{destination};
172 0 0         my @destination = ref($destination)? @$destination : $destination;
173 0           my %cluster = %{$arg{cluster}};
  0            
174 0           my %method = %{$arg{method}};
  0            
175 0   0       my $scp = $arg{scp} || 'scp';
176 0   0       my $scpoptions = $arg{scpoptions} || '';
177 0           my $sourcefile = $arg{sourcefile};
178 0           my $name = $arg{name};
179              
180             # hash source: keys: source machines. values: lists of source paths for that machine
181 0           my (%pid, %proc, %source);
182              
183             my $sendfiles = sub {
184 0     0     my ($m, $cp) = @_;
185              
186             # @= is a macro and means "the name of the target machine"
187 0 0         my $targetname = exists($name->{$m}) ? $name->{$m} : $m;
188 0           $cp =~ s/@=/$targetname/g;
189              
190             # @# stands for source machine: decompose transfer
191 0           for my $sm (keys %source) {
192 0 0         my $sf = $sm? "$sm:@{$source{$sm}}" : "@{$source{$sm}}"; # $sm: source machine
  0            
  0            
193 0           my $fp = $cp; # $fp: path customized for this source machine
194              
195             # what if it is $sm eq '' the localhost?
196 0           my $sn = $sm;
197 0 0         $sn = $name->{$sm} if (exists $name->{$sm});
198 0           $fp =~ s/@#/$sn/g;
199              
200 0 0         my $target = ($m eq 'localhost')? $fp : "$m:$fp";
201 0 0         warn "Executing system command:\n\t$scp $scpoptions $sf $target\n" if $VERBOSE;
202 0 0         unless ($DRYRUN) {
203 0           my $pid = open(my $p, "$scp $scpoptions $sf $target 2>&1 |");
204 0 0         if (exists $pid{$m}) {
205 0           push @{$pid{$m}}, $pid;
  0            
206             }
207             else {
208 0           $pid{$m} = [ $pid ];
209             }
210              
211 0 0         warn "Can't execute scp $scpoptions $sourcefile $target", next unless defined($pid);
212              
213 0           $proc{0+$p} = $m;
214 0           $readset->add($p);
215             }
216             }
217 0           };
218              
219             # '' and 'localhost' are synonymous
220 0           make_synonymous($name, '', 'localhost', 'localhost');
221              
222 0 0         $VERBOSE++ if $DRYRUN;
223              
224             # @# stands for the source machine: decompose the transfer, one per source machine
225 0           %source = parse_sourcefile($sourcefile); # if "@destination" =~ /@#/;
226              
227             # expand clusters in sourcefile
228 0           for my $ce (keys %source) {
229 0 0         next unless $ce; # go ahead if local machine
230 0           my $set = translate($configfile, $ce, \%cluster, \%method);
231              
232             # leave it as it is if is a single node
233 0 0         next unless $set->members > 1;
234              
235 0           my $paths = $source{$ce};
236 0           $source{$_} = $paths for $set->members;
237 0           delete $source{$ce};
238             }
239              
240 0           for (@destination) {
241              
242 0           my ($clusterexp, $path);
243 0 0         unless (/^([^:]*):([^:]*)$/) {
244 0           warn "Error. Destination '$_' must have just one colon (:). Skipping transfer.\n";
245 0           next;
246             }
247              
248 0 0         if ($1) { # There is a target cluster expression
249 0           ($clusterexp, $path) = split /\s*:\s*/;
250              
251 0           my $set = translate($configfile, $clusterexp, \%cluster, \%method);
252 0 0         next unless $set;
253              
254 0           $sendfiles->($_, $path) for ($set->members);
255              
256             }
257             else { # No target cluster: target is the local machine
258 0           $path = $2;
259 0           $scpoptions .= '-r';
260 0           $sendfiles->('localhost', $path);
261             }
262             } # for @destination
263              
264 0           return (\%pid, \%proc);
265             }
266              
267             sub parpush {
268 0     0 1   my %arg = @_;
269              
270 0           my ($cluster, $method) = parse_configfile($arg{configfile});
271              
272 0           my $readset = IO::Select->new();
273              
274             # $proc is a hash ref. keys: memory address of some IO stream.
275             # Values the name of the assoc. machine.
276             # $pid is a hash ref
277             # keys: machine names. Values: process Ids
278 0           my ($pid, $proc) = spawn_secure_copies(
279             readset => $readset,
280             cluster => $cluster,
281             method => $method,
282             %arg,
283             );
284              
285 0           my $okh = {};
286 0 0         $okh = wait_for_answers($readset, $proc) unless $DRYRUN;;
287              
288 0 0         return wantarray? ($okh, $pid) : $okh;
289             }
290              
291             1;
292              
293             __END__