File Coverage

blib/lib/urpm/parallel.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package urpm::parallel;
2              
3 1     1   2836 use strict;
  1         4  
  1         40  
4 1     1   46 use urpm;
  0            
  0            
5             use urpm::util qw(any basename cat_);
6             use urpm::msg;
7              
8             =head1 NAME
9              
10             urpm::parallel - Run an urpmi command in parallel on a number of hosts
11              
12             =head1 SYNOPSIS
13              
14             This enables to run urpmi command on several computers at once.
15              
16             Two backends are available: L and L
17              
18             =head1 DESCRIPTION
19              
20             =over
21              
22             =cut
23              
24              
25             sub configure {
26             my ($urpm, $alias) = @_;
27             my @parallel_options;
28             #- read parallel configuration
29             foreach (cat_("/etc/urpmi/parallel.cfg")) {
30             chomp; s/#.*$//; s/^\s*//; s/\s*$//;
31             /\s*([^:]*):(.*)/ or $urpm->{error}(N("unable to parse \"%s\" in file [%s]", $_, "/etc/urpmi/parallel.cfg")), next;
32             $1 eq $alias and push @parallel_options, $2;
33             }
34             #- if a configuration option has been found, use it; else fatal error.
35             my $parallel_handler;
36             if (@parallel_options) {
37             foreach my $dir (grep { -d $_ } map { "$_/urpm" } @INC) {
38             foreach my $pm (grep { -f $_ } glob("$dir/parallel_*.pm")) {
39             #- load parallel modules
40             $urpm->{log}->(N("examining parallel handler in file [%s]", $pm));
41             # perl_checker: require urpm::parallel_ka_run
42             # perl_checker: require urpm::parallel_ssh
43             eval { require $pm; $parallel_handler = $urpm->handle_parallel_options(join("\n", @parallel_options)) };
44             $parallel_handler and last;
45             }
46             $parallel_handler and last;
47             }
48             }
49             if ($parallel_handler) {
50             if ($parallel_handler->{nodes}) {
51             $urpm->{log}->(N("found parallel handler for nodes: %s", join(', ', keys %{$parallel_handler->{nodes}})));
52             }
53             $urpm->{parallel_handler} = $parallel_handler;
54             } else {
55             $urpm->{fatal}(1, N("unable to use parallel option \"%s\"", $alias));
56             }
57             }
58              
59             sub resolve_dependencies {
60             my ($urpm, $state, $requested, %options) = @_;
61              
62             #- build the global synthesis file first.
63             my $file = "$urpm->{cachedir}/partial/parallel.cz";
64             unlink $file;
65             foreach (@{$urpm->{media}}) {
66             urpm::media::is_valid_medium($_) or next;
67             my $f = urpm::media::any_synthesis($urpm, $_);
68             system "cat '$f' >> '$file'";
69             }
70             #- let each node determine what is requested, according to handler given.
71             $urpm->{parallel_handler}->parallel_resolve_dependencies($file, $urpm, $state, $requested, %options);
72             }
73              
74             =item remove($urpm, $remove, %options)
75              
76             remove packages from node as remembered according to resolving done.
77              
78             =cut
79              
80             sub remove {
81             my ($urpm, $remove, %options) = @_;
82             my $state = {};
83             my $callback = sub { $urpm->{fatal}(1, "internal distributed remove fatal error") };
84             $urpm->{parallel_handler}->parallel_find_remove($urpm, $state, $remove, %options,
85             callback_notfound => undef,
86             callback_fuzzy => $callback,
87             callback_base => $callback,
88             );
89             }
90              
91             =item parallel_find_remove($parallel, $urpm, $state, $l, %options)
92              
93             parallel find_packages_to_remove
94              
95             =cut
96              
97             sub parallel_find_remove {
98             my ($parallel, $urpm, $state, $l, %options) = @_;
99              
100             my ($test, $pkgs) = _find_remove_pre($urpm, $state, %options);
101             $pkgs and return @$pkgs;
102              
103             my (%bad_nodes, %base_to_remove, %notfound);
104              
105             #- now try an iteration of urpme.
106             $parallel->urpm_popen($urpm, 'urpme', "--auto $test" . join(' ', map { "'$_'" } @$l) . ' 2>&1', sub {
107             my ($node, $s) = @_;
108              
109             _parse_urpme_output($urpm, $state, $node, $s,
110             \%notfound, \%base_to_remove, \%bad_nodes, %options);
111             });
112              
113             #- check base, which has been delayed until there.
114             if ($options{callback_base} && %base_to_remove) {
115             $options{callback_base}->($urpm, keys %base_to_remove) or return ();
116             }
117              
118             #- build error list contains all the error returned by each node.
119             $urpm->{error_remove} = [ map {
120             my $msg = N("on node %s", $_);
121             map { "$msg, $_" } @{$bad_nodes{$_}};
122             } keys %bad_nodes ];
123              
124             #- if at least one node has the package, it should be seen as unknown...
125             delete @notfound{map { /^(.*)-[^-]*-[^-]*$/ } keys %{$state->{rejected}}};
126             if (%notfound) {
127             $options{callback_notfound} && $options{callback_notfound}->($urpm, keys %notfound)
128             or delete $state->{rejected};
129             }
130              
131             keys %{$state->{rejected}};
132             }
133              
134              
135             =item parallel_register_rpms($parallel, $urpm, @files)
136              
137             parallel copy
138              
139             =cut
140              
141             sub parallel_register_rpms {
142             my ($parallel, $urpm, @files) = @_;
143              
144             $parallel->copy_to_dir($urpm, @files, "$urpm->{cachedir}/rpms");
145              
146             #- keep trace of direct files.
147             $parallel->{line} .=
148             join(' ',
149             map { "'$_'" }
150             map { "$urpm->{cachedir}/rpms/" . basename($_) } @files);
151             }
152              
153             sub _find_remove_pre {
154             my ($urpm, $state, %options) = @_;
155              
156             #- keep in mind if the previous selection is still active, it avoids
157             #- to re-start urpme --test on each node.
158             if ($options{find_packages_to_remove}) {
159             delete $state->{rejected};
160             delete $urpm->{error_remove};
161             '--test ';
162             } elsif (@{$urpm->{error_remove} || []}) {
163             undef, $urpm->{error_remove};
164             } elsif ($options{test}) {
165             #- no need to restart what has been started before.
166             undef, [ keys %{$state->{rejected}} ];
167             } else {
168             '--force ';
169             }
170             }
171              
172             sub _parse_urpme_output {
173             my ($urpm, $state, $node, $s, $notfound, $base_to_remove, $bad_nodes, %options) = @_;
174              
175             if ($s =~ /^\s*$/) {
176             } elsif ($s =~ /unknown packages?:? (.*)/) {
177             #- remember unknown packages from the node, because it should not be a fatal error
178             #- if other nodes have it.
179             $notfound->{$_} = undef foreach split ", ", $1;
180             } elsif ($s =~ /The following packages contain ([^:]*): (.*)/) {
181             $options{callback_fuzzy} && $options{callback_fuzzy}->($urpm, $1, split(" ", $2))
182             or delete($state->{rejected}), return 'stop_parse';
183             } elsif ($s =~ /removing package (.*) will break your system/) {
184             $base_to_remove->{$1} = undef;
185             } elsif ($s =~ /^(removing|testing removal of) (.*)/) {
186             foreach my $fn (split ' ', $2) {
187             $state->{rejected}{$fn}{removed} = 1;
188             $state->{rejected}{$fn}{nodes}{$node} = undef;
189             }
190             } elsif ($s =~ /Remov(?:al|ing) failed/) {
191             $bad_nodes->{$node} = [];
192             } elsif (exists $bad_nodes->{$node}) {
193             $s =~ /^\s+(.+)/ and push @{$bad_nodes->{$node}}, $1;
194             }
195             return;
196             }
197              
198             sub _parse_urpmq_output {
199             my ($urpm, $state, $node, $s, $cont, $chosen, %options) = @_;
200              
201             chomp $s;
202              
203             if (my ($action, $what) = $s =~ /^\@([^\@]*)\@(.*)/) {
204             if ($action eq 'removing') {
205             $state->{rejected}{$what}{removed} = 1;
206             $state->{rejected}{$what}{nodes}{$node} = undef;
207             }
208             } elsif ($s =~ /\|/) {
209             #- distant urpmq returned a choices, check if it has already been chosen
210             #- or continue iteration to make sure no more choices are left.
211             $$cont ||= 1; #- invalid transitory state (still choices is strange here if next sentence is not executed).
212             unless (any { exists $chosen->{$_} } split /\|/, $s) {
213             my $choice = $options{callback_choices}->($urpm, undef, $state, [ map { $urpm->search($_) } split /\|/, $s ]);
214             if ($choice) {
215             $chosen->{scalar $choice->fullname} = $choice;
216             #- it has not yet been chosen so need to ask user.
217             $$cont = 2;
218             } else {
219             #- no choices resolved, so forget it (no choices means no choices at all).
220             $$cont = 0;
221             }
222             }
223             } else {
224             my $pkg = $urpm->search($s) or return; #TODO
225             $state->{selected}{$pkg->id}{$node} = $s;
226             }
227             }
228              
229             =item parallel_resolve_dependencies($parallel, $synthesis, $urpm, $state, $requested, %options)
230              
231             parallel resolve_dependencies on each node
232              
233             =cut
234              
235             sub parallel_resolve_dependencies {
236             my ($parallel, $synthesis, $urpm, $state, $requested, %options) = @_;
237              
238             #- first propagate the synthesis file to all machines
239             $parallel->propagate_file($urpm, $synthesis);
240              
241             $parallel->{synthesis} = $synthesis;
242              
243             my $line = _simple_resolve_dependencies($parallel, $urpm, $state, $requested, %options);
244              
245             #- execute urpmq to determine packages to install.
246             my ($cont, %chosen);
247             do {
248             $cont = 0; #- prepare to stop iteration.
249             #- the following state should be cleaned for each iteration.
250             delete $state->{selected};
251             #- now try an iteration of urpmq.
252             my @errors = $parallel->urpm_popen($urpm, 'urpmq', "--synthesis $synthesis -fmc $line " . join(' ', keys %chosen), sub {
253             my ($node, $s) = @_;
254             _parse_urpmq_output($urpm, $state, $node, $s, \$cont, \%chosen, %options);
255             undef;
256             });
257             @errors and $urpm->{fatal}(1, join("\n", @errors));
258             #- check for internal error of resolution.
259             $cont == 1 and die "internal distant urpmq error on choice not taken";
260             } while $cont;
261              
262             #- keep trace of what has been chosen finally (if any).
263             $parallel->{line} = join(' ', $line, keys %chosen);
264             }
265              
266             =item _simple_resolve_dependencies($parallel, $urpm, $state, $requested, %options)
267              
268             Compute command line of urpm? tools
269              
270             =cut
271              
272             sub _simple_resolve_dependencies {
273             my ($parallel, $urpm, $state, $requested, %options) = @_;
274              
275             my @pkgs;
276             foreach (keys %$requested) {
277             if (/\|/) {
278             #- taken from URPM::Resolve to filter out choices, not complete though.
279             my @packages = $urpm->find_candidate_packages_($_);
280             foreach (@packages) {
281             my ($best_requested, $best);
282             foreach (@$_) {
283             exists $state->{selected}{$_->id} and $best_requested = $_, last;
284             if ($best_requested) {
285             if ($best_requested && $best_requested != $_) {
286             $_->compare_pkg($best_requested) > 0 and $best_requested = $_;
287             } else {
288             $best_requested = $_;
289             }
290             } elsif ($best && $best != $_) {
291             $_->compare_pkg($best) > 0 and $best = $_;
292             } else {
293             $best = $_;
294             }
295             }
296             $_ = $best_requested || $best;
297             }
298             #- simplified choice resolution.
299             my $choice = $options{callback_choices}->($urpm, undef, $state, \@packages);
300             if ($choice) {
301             push @pkgs, $choice;
302             }
303             } else {
304             my $pkg = $urpm->{depslist}[$_] or next;
305             push @pkgs, $pkg;
306             }
307             }
308             #- local packages have already been added.
309             @pkgs = grep { !$urpm->{source}{$_->id} } @pkgs;
310              
311             $parallel->{line} .
312             ($options{auto_select} ? ' --auto-select' : '') .
313             ($options{keep} ? ' --keep' : '') .
314             join(' ', map { scalar $_->fullname } @pkgs);
315             }
316              
317             sub parallel_install {
318             my ($parallel, $urpm, undef, $install, $upgrade, %options) = @_;
319              
320             $parallel->copy_to_dir($urpm, values %$install, values %$upgrade, "$urpm->{cachedir}/rpms");
321              
322             my (%bad_nodes, @good_nodes);
323             $parallel->urpm_popen($urpm, 'urpmi', "--pre-clean --test --no-verify-rpm --auto --synthesis $parallel->{synthesis} $parallel->{line}", sub {
324             my ($node, $s) = @_;
325             $s =~ /^\s*$/ and return;
326             $bad_nodes{$node} .= "$s\n";
327             $s =~ /Installation failed/ and $bad_nodes{$node} = '';
328             $s =~ /Installation is possible|Packages are up to date/ and push @good_nodes, $node;
329             undef;
330             });
331             delete $bad_nodes{$_} foreach @good_nodes;
332              
333             foreach (keys %{$parallel->{nodes}}) {
334             exists $bad_nodes{$_} or next;
335             $urpm->{error}(N("Installation failed on node %s", $_) . ":\n" . $bad_nodes{$_});
336             }
337             %bad_nodes and return;
338              
339             if ($options{test}) {
340             $urpm->{error}(N("Installation is possible"));
341             1;
342             } else {
343             my $line = $parallel->{line} . ($options{excludepath} ? " --excludepath '$options{excludepath}'" : "");
344             #- continue installation.
345             $parallel->run_urpm_command($urpm, 'urpmi', "--no-verify-rpm --auto --synthesis $parallel->{synthesis} $line");
346             }
347             }
348              
349             1;
350              
351             =back
352              
353             =head1 COPYRIGHT
354              
355             Copyright (C) 2005 MandrakeSoft SA
356              
357             Copyright (C) 2005-2010 Mandriva SA
358              
359             Copyright (C) 2011-2017 Mageia
360              
361             =cut