File Coverage

blib/lib/Net/OpenSSH/Parallel.pm
Criterion Covered Total %
statement 24 504 4.7
branch 0 328 0.0
condition 0 64 0.0
subroutine 8 41 19.5
pod 7 7 100.0
total 39 944 4.1


line stmt bran cond sub pod time code
1             package Net::OpenSSH::Parallel;
2              
3             our $VERSION = '0.12';
4              
5 1     1   51240 use strict;
  1         3  
  1         45  
6 1     1   5 use warnings;
  1         1  
  1         34  
7 1     1   5 use Carp qw(croak carp);
  1         6  
  1         87  
8              
9 1     1   2231 use Net::OpenSSH;
  1         49146  
  1         53  
10 1     1   748 use Net::OpenSSH::Parallel::Constants qw(:error :on_error);
  1         2  
  1         203  
11              
12 1     1   5 use POSIX qw(WNOHANG);
  1         1  
  1         8  
13 1     1   1049 use Time::HiRes qw(time);
  1         1980  
  1         5  
14 1     1   207 use Scalar::Util qw(dualvar);
  1         2  
  1         7602  
15              
16             our $debug;
17              
18             sub new {
19 0     0 1   my ($class, %opts) = @_;
20 0           my $max_workers = delete $opts{workers};
21 0           my $max_conns = delete $opts{connections};
22 0           my $reconnections = delete $opts{reconnections};
23 0           my $on_error = delete $opts{on_error};
24              
25 0 0         if ($max_conns) {
26 0 0         if ($max_workers) {
27 0 0         $max_conns < $max_workers and
28             croak "connections ($max_conns) < workers ($max_workers)";
29             }
30             else {
31 0           $max_workers = $max_conns;
32             }
33             }
34              
35 0 0         %opts and croak "unknonwn option(s): ". join(", ", keys %opts);
36              
37 0           my $self = { joins => {},
38             hosts => {},
39             host_by_pid => {},
40             ssh_master_by_pid => {},
41             in_state => {
42             connecting => {},
43             ready => {},
44             running => {},
45             done => {},
46             waiting => {},
47             suspended => {},
48             join_failed => {},
49             },
50             connected => { suspended => {},
51             waiting => {},
52             join_failed => {},
53             },
54             joins => {},
55             max_workers => $max_workers,
56             max_conns => $max_conns,
57             num_conns => 0,
58             reconnections => $reconnections,
59             on_error => $on_error,
60             };
61 0           bless $self, $class;
62 0           $self;
63             }
64              
65             my %debug_channel = (api => 1, state => 2, select => 4, at => 8,
66             action => 16, join => 32, workers => 64,
67             connect => 128, conns => 256, error => 512);
68              
69             sub _debug {
70 0     0     my $channel = shift;
71 0 0         my $bit = $debug_channel{$channel}
72             or die "internal error: bad debug channel $channel";
73 0 0         if ($debug & $debug_channel{$channel}) {
74 0 0         print STDERR sprintf("%6.3f", (time - $^T)), "| ",
75 0           join('', map { defined($_) ? $_ : '' } @_), "\n";
76             }
77             }
78              
79             sub add_host {
80 0     0 1   my $self = shift;
81 0           my $label = shift;
82 0 0         $label =~ /([,*!()<>\/{}])/ and croak "invalid char '$1' in host label";
83 0 0         my %opts = (@_ & 1 ? (host => @_) : @_);
84 0 0         $opts{host} = $label unless defined $opts{host};
85 0 0         $opts{batch_mode} = 1 unless defined $opts{batch_mode};
86              
87 0           my $on_error = delete $opts{on_error};
88 0           my $reconnections = delete $opts{reconnections};
89              
90 0           my $host = { label => $label,
91             workers => 1,
92             opts => \%opts,
93             ssh => undef,
94             state => 'done',
95             queue => [],
96             on_error => $on_error,
97             reconnections => $reconnections,
98             };
99              
100 0           $self->{hosts}{$label} = $host;
101 0 0         $debug and _debug(api => "[$label] added ($host)");
102 0           $self->{in_state}{done}{$label} = 1;
103 0 0         $debug and _debug(state => "[$label] state set to done");
104             }
105              
106             sub _set_host_state {
107 0     0     my ($self, $label, $state) = @_;
108 0           my $host = $self->{hosts}{$label};
109 0           my $old = $host->{state};
110 0 0         delete $self->{in_state}{$old}{$label}
111             or die "internal error: host $label is in state $old but not in such queue";
112 0 0 0       delete $self->{connected}{$old}{$label}
      0        
113             if ($old eq 'suspended' or $old eq 'waiting' or $old eq 'join_failed');
114              
115 0           $self->{in_state}{$state}{$label} = 1;
116 0           $host->{state} = $state;
117 0 0         $debug and _debug(state => "[$label] state changed $old --> $state");
118              
119 0 0 0       if ($host->{ssh} and ($state eq 'suspended' or
      0        
120             $state eq 'waiting' or
121             $state eq 'join_failed')) {
122 0           $self->{connected}{$state}{$label} = 1;
123 0 0         $debug and _debug(state => "[$label] host is connected");
124             }
125             }
126              
127             my %sel2re_cache;
128              
129             sub _selector_to_re {
130 0     0     my ($self, $part) = @_;
131 0   0       $sel2re_cache{$part} ||= do {
132 0           $part = quotemeta $part;
133 0           $part =~ s/\\\*/.*/g;
134 0           qr/^$part$/;
135             }
136             }
137              
138             sub _select_labels {
139 0     0     my ($self, $selector) = @_;
140 0           my %sel;
141 0           my @parts = split /\s*,\s*/, $selector;
142 0           for (@parts) {
143 0           my $re = $self->_selector_to_re($_);
144 0           $sel{$_} = 1 for grep $_ =~ $re, keys %{$self->{hosts}};
  0            
145             }
146 0           my @labels = keys %sel;
147 0 0         $debug and _debug(select => "selector($selector) --> [", join(', ', @labels), "]");
148 0           return @labels;
149             }
150              
151 0     0 1   sub all { shift->push('*', @_) }
152              
153             my %push_action_alias = (get => 'scp_get',
154             put => 'scp_put',
155             psub => 'parsub',
156             cmd => 'command');
157              
158             my %push_min_args = ( here => 1,
159             goto => 1,
160             stop => 0,
161             sub => 1,
162             parsub => 1,
163             scp_get => 2,
164             scp_put => 2,
165             rsync_get => 2,
166             rsync_put => 2 );
167              
168             my %push_max_args = ( here => 1,
169             goto => 1,
170             stop => 0 );
171              
172             sub push {
173 0     0 1   my $self = shift;
174 0           my $selector = shift;
175 0           my $action = shift;
176 0           my $in_state = $self->{in_state};
177              
178 0 0         if (ref $action eq 'CODE') {
179 0           unshift @_, $action;
180 0           $action = 'sub';
181             }
182              
183 0           my $alias = $push_action_alias{$action};
184 0 0         $action = $alias if defined $alias;
185              
186 0 0         $action =~ /^(?:command|(?:(?:rsync|scp)_(?:get|put))|join|sub|parsub|here|stop|goto|_notify)$/
187             or croak "bad action '$action'";
188              
189 0 0 0       my %opts = (($action ne 'sub' and ref $_[0] eq 'HASH') ? %{shift()} : ());
  0            
190 0 0 0       %opts and grep($action eq $_, qw(join here))
191             and croak "unsupported option(s) '" . join("', '", keys %opts) . "' in $action action";
192              
193 0           my @labels = $self->_select_labels($selector);
194              
195 0           my $max = $push_max_args{$action};
196 0 0 0       croak "too many parameters for action $action"
197             if (defined $max and $max < @_);
198              
199 0           my $min = $push_min_args{$action};
200 0 0 0       croak "too few parameters for action $action"
201             if (defined $min and $min > @_);
202              
203 0 0         if ($action eq 'join') {
204 0           my $notify_selector = shift @_;
205 0           my $join = { id => '#' . $self->{join_seq}++,
206             depends => {},
207             notify => [] };
208             my @depends = $self->push($notify_selector, _notify => {}, $join)
209 0 0         or do {
210 0           $join->_debug(join => "join $join->{id} does not depend on anything, ignoring!");
211 0           return ();
212             };
213 0           $join->{depends}{$_} = 1 for @depends;
214              
215 0           for my $label (@labels) {
216 0           my $host = $self->{hosts}{$label};
217 0           push @{$host->{queue}}, [join => {}, $join];
  0            
218 0 0         $debug and _debug(api => "[$label] join $join->{id} queued");
219 0 0         $self->_set_host_state($label, 'ready')
220             if $in_state->{done}{$label};
221             }
222             }
223             else {
224 0           for my $label (@labels) {
225 0           my $host = $self->{hosts}{$label};
226 0           push @{$host->{queue}}, [$action, \%opts, @_];
  0            
227 0 0         $debug and _debug(api => "[$label] action $action queued");
228 0 0         $self->_set_host_state($label, 'ready')
229             if $in_state->{done}{$label};
230             }
231             }
232 0           @labels;
233             }
234              
235             sub _audit_conns {
236 0     0     my $self = shift;
237 0           my $hosts = $self->{hosts};
238 0           my $num = 0;
239 0           $num++ for grep $_->{ssh}, values %$hosts;
240 0 0         $debug and _debug(conns => "audit_conns counted: $num, saved: $self->{num_conns}");
241 0 0         $num == $self->{num_conns}
242             or die "internal error: wrong number of connections, counted: $num, saved: $self->{num_conns}";
243 0           my $in_state = $self->{in_state};
244 0           for my $state (keys %$in_state) {
245 0           my $num = 0;
246 0           $num++ for grep $hosts->{$_}{ssh}, keys %{$in_state->{$state}};
  0            
247 0           my $total = keys %{$in_state->{$state}};
  0            
248 0           print STDERR "conns in state $state: $num of $total\n";
249             }
250             }
251              
252             sub _hash_chain_get {
253 0     0     my $name = shift;
254 0           for (@_) {
255 0 0         if (defined $_) {
256 0           my $v = $_->{$name};
257 0 0         return $v if defined $v;
258             }
259             }
260 0           undef;
261             }
262              
263             sub _at_error {
264 0     0     my ($self, $label, $error) = @_;
265 0           my $host = $self->{hosts}{$label};
266 0           my $task = delete $host->{current_task};
267 0           my $queue = $host->{queue};
268              
269 0 0         $debug and _debug(error => "_at_error label: $label, error: $error");
270              
271 0           my $opts;
272 0 0         $opts = $task->[1] if $task;
273              
274 0           my $on_error;
275 0 0         if ($error == OSSH_MASTER_FAILED) {
276 0 0         if ($host->{state} eq 'connecting') {
277             # task is not set in state connecting!
278 0 0         $task and die "internal error: task is defined in state connecting";
279 0 0         $opts = $queue->[0][1] if @$queue;
280             }
281 0   0       my $max_reconnections = _hash_chain_get(reconnections => $opts, $host, $self) || 0;
282 0   0       my $reconnections = $host->{current_task_reconnections}++ || 0;
283 0 0         $debug and _debug(error => "[$label] reconnection: $reconnections, max: $max_reconnections");
284 0 0         if ($reconnections < $max_reconnections) {
285 0 0         $debug and _debug(error => "[$label] will reconnect!");
286 0           $on_error = OSSH_ON_ERROR_RETRY;
287             }
288             }
289 0   0       $on_error ||= _hash_chain_get(on_error => $opts, $host, $self);
290              
291 0 0         if (ref $on_error eq 'CODE') {
292 0 0         if ($error == OSSH_JOIN_FAILED) {
293 0           $on_error = $on_error->($self, $label, $error);
294             }
295             else {
296 0           $on_error = $on_error->($self, $label, $error, $task);
297             }
298             }
299              
300 0 0 0       $on_error = OSSH_ON_ERROR_ABORT if (not defined $on_error or
301             $error == OSSH_ABORTED);
302              
303 0 0         $debug and _debug(error => "[$label] on_error (final): $on_error, error: $error (".($error+0).")");
304              
305 0 0         if ($on_error == OSSH_ON_ERROR_RETRY) {
306 0 0         if ($error == OSSH_MASTER_FAILED) {
    0          
307 0           $self->_set_host_state($label, 'suspended');
308 0           $self->_disconnect_host($label);
309 0           $self->_set_host_state($label, 'ready');
310             }
311             elsif ($error == OSSH_GOTO_FAILED) {
312             # No way to retry after a GOTO error!
313             # That should probably croak, but that would leave unmanaged
314             # processes running
315 0           $on_error = OSSH_ON_ERROR_ABORT;
316             }
317             else {
318 0           unshift @$queue, $task;
319             }
320 0           return;
321             }
322              
323 0           delete $host->{current_task_reconnections};
324              
325 0 0         if ($on_error == OSSH_ON_ERROR_IGNORE) {
326 0 0         if ($error == OSSH_MASTER_FAILED) {
327             # stablishing a new connection failed, what we should do?
328             # currently we remove the current task from the queue and
329             # continue.
330 0 0         shift @$queue unless $task;
331 0           $self->_set_host_state($label, 'suspended');
332 0           $self->_disconnect_host($label);
333 0           $self->_set_host_state($label, 'ready');
334             }
335             else {
336 0           $self->_set_host_state($label, 'ready');
337             }
338             # else do nothing
339             }
340             else {
341 0 0 0       unless ($on_error == OSSH_ON_ERROR_DONE or
      0        
342             $on_error == OSSH_ON_ERROR_ABORT or
343             $on_error == OSSH_ON_ERROR_ABORT_ALL) {
344 0           carp "bad on_error code $on_error";
345 0           $on_error = OSSH_ON_ERROR_ABORT;
346             }
347 0           my $queue = $host->{queue};
348 0           my $failed = ($on_error != OSSH_ON_ERROR_DONE);
349 0 0         $debug and _debug(error => "[$label] dropping queue, ", scalar(@$queue), " jobs");
350 0           while (my $task = shift @$queue) {
351 0           my ($action, undef, $join) = @$task;
352 0 0         $debug and _debug(error => "[$label] remove action $action from queue");
353 0 0         $self->_join_notify($label, $join, $failed)
354             if $action eq '_notify';
355             }
356              
357 0 0         $on_error == OSSH_ON_ERROR_ABORT_ALL
358             and $self->{abort_all} = 1;
359              
360 0           $self->_set_host_state($label, 'done');
361 0           $self->_disconnect_host($label);
362 0           $host->{error} = $error;
363             }
364             }
365              
366             sub _at_connect {
367 0     0     my ($self, $label) = @_;
368 0           my $host = $self->{hosts}{$label};
369 0 0         $debug and _debug(connect => "[$label] _connect, starting SSH connection");
370 0 0         $host->{ssh} and die "internal error: connecting host is already connected";
371 0           my $ssh = $host->{ssh} = Net::OpenSSH->new(expand_vars => 1,
372 0           %{$host->{opts}},
373             async => 1);
374 0           $ssh->set_var(LABEL => $label);
375 0           my $master_pid = $ssh->get_master_pid;
376 0           $host->{master_pid} = $master_pid;
377 0           $self->{ssh_master_by_pid}{$master_pid} = $label;
378 0           $self->{num_conns}++;
379 0           $self->_set_host_state($label, 'connecting');
380 0 0         if ($ssh->error) {
381 0           $self->_at_error($label, $ssh->error);
382             }
383             }
384              
385             sub _at_connecting {
386 0     0     my ($self, $label) = @_;
387 0           my $host = $self->{hosts}{$label};
388 0 0         $debug and _debug(at => "[$label] at_connecting, waiting for master");
389 0           my $ssh = $host->{ssh};
390 0 0         if ($ssh->wait_for_master(1)) {
    0          
391 0 0         $debug and _debug(at => "[$label] at_connecting, master connected");
392 0           $self->_set_host_state($label, 'ready');
393             }
394             elsif ($ssh->error) {
395 0           $self->_at_error($label, $ssh->error);
396             }
397             }
398              
399             sub _join_notify {
400 0     0     my ($self, $label, $join, $failed) = @_;
401             # use Data::Dumper;
402             # print STDERR Dumper $join;
403 0 0         delete $join->{depends}{$label}
404             or die "internal error: $join->{id} notified for non dependent label $label";
405 0 0         $debug and _debug(join => "removing dependent $label from join $join->{id}");
406 0 0         $join->{failed} = 1 if $failed;
407 0 0         if (not %{$join->{depends}}) {
  0            
408 0 0         $debug and _debug(join => "join $join->{id} done");
409 0           $join->{done} = 1;
410 0           my $failed = $join->{failed};
411 0           for my $label (@{$join->{notify}}) {
  0            
412 0 0         $debug and _debug(join => "notifying $label about join $join->{id} done");
413 0 0         $self->_set_host_state($label, $failed ? 'join_failed' : 'ready');
414             }
415             }
416             # print STDERR Dumper $join;
417             }
418              
419             sub _num_workers {
420 0     0     my $in_state = shift->{in_state};
421 0           ( keys(%{$in_state->{ready}}) +
  0            
422 0           keys(%{$in_state->{running}}) +
423 0           keys(%{$in_state->{connecting}}) );
424             }
425              
426             sub _disconnect_host {
427 0     0     my ($self, $label) = @_;
428 0           my $host = $self->{hosts}{$label};
429 0           my $state = $host->{state};
430 0 0         $state =~ /^(?:waiting|suspended|done|connecting)$/
431             or die "internal error: disconnecting $label in state $state";
432 0 0         if ($host->{ssh}) {
433 0 0         $debug and _debug(connect => "[$label] disconnecting host");
434 0           my $master_pid = delete $host->{master_pid};
435 0 0         delete $self->{ssh_master_by_pid}{$master_pid}
436             if defined $master_pid;
437 0           undef $host->{ssh};
438 0           $self->{num_conns}--;
439 0           $self->_set_host_state($label, $state);
440             }
441             }
442              
443             sub _disconnect_any_host {
444 0     0     my $self = shift;
445 0           my $connected = $self->{connected};
446 0 0         $debug and _debug(conns => "disconnect any host");
447             # $self->_audit_conns;
448 0           my $label;
449 0           for my $state (qw(suspended join_failed waiting)) {
450             # use Data::Dumper;
451             # print Dumper $connected;
452 0 0         $debug and _debug(conns => "looking for connected host in state $state");
453 0           ($label) = each %{$connected->{$state}};
  0            
454 0           keys %{$connected->{$state}}; # reset iterator
  0            
455 0 0         last if defined $label;
456             }
457 0 0         $debug and _debug(conns => "[$label] disconnecting");
458 0 0         defined $label or die "internal error: unable to disconnect any host";
459 0           $self->_disconnect_host($label);
460             }
461              
462             my @private_opts = qw(on_error or_goto reconnections);
463              
464             sub _at_ready {
465 0     0     my ($self, $label) = @_;
466 0 0         if (my $max_workers = $self->{max_workers}) {
467 0           my $in_state = $self->{in_state};
468 0           my $num_workers = $self->_num_workers;
469 0 0         $debug and _debug(workers => "num workers: $num_workers, maximun: $max_workers");
470 0 0         if ($num_workers > $max_workers) {
471 0 0         $debug and _debug(workers => "[$label] suspending");
472 0           $self->_set_host_state($label, 'suspended');
473 0           return;
474             }
475             }
476              
477 0           my $host = $self->{hosts}{$label};
478 0 0         $debug and _debug(at => "[$label] at_ready");
479              
480 0           my $queue = $host->{queue};
481              
482 0 0         if ($self->{abort_all}) {
483 0           $self->_at_error($label, OSSH_ABORTED);
484 0           return;
485             }
486              
487 0           while (defined (my $task = shift @$queue)) {
488 0           my $action = shift @$task;
489 0 0         $debug and _debug(at => "[$label] at_ready, starting new action $action");
490 0 0         if ($action eq 'join') {
    0          
    0          
    0          
    0          
    0          
491 0           my (undef, $join) = @$task;
492 0 0         if ($join->{done}) {
493 0 0         $debug and _debug(join => "join[$join->{id}] is done");
494 0 0         if ($join->{failed}) {
495 0           $self->_at_error($label, OSSH_JOIN_FAILED);
496 0           return;
497             }
498 0 0         $debug and _debug(action => "[$label] join $join->{id} already done");
499 0           next;
500             }
501 0           CORE::push @{$join->{notify}}, $label;
  0            
502 0           $self->_set_host_state($label, 'waiting');
503 0           return;
504             }
505             elsif ($action eq 'here') {
506 0           next;
507             }
508             elsif ($action eq 'stop') {
509 0           $self->_skip($label, 'END');
510 0           next;
511             }
512             elsif ($action eq 'goto') {
513 0           my (undef, $target) = @$task;
514 0           $self->_skip($label, $target);
515 0           next;
516             }
517             elsif ($action eq '_notify') {
518 0           my (undef, $join) = @$task;
519 0           $self->_join_notify($label, $join);
520 0           next;
521             }
522             elsif ($action eq 'sub') {
523             # use Data::Dumper;
524             # _debug (action => Dumper(@$task));
525 0           shift @$task;
526 0           my $sub = shift @$task;
527 0 0         $debug and _debug(action => "[$label] calling sub $sub");
528 0           $sub->($self, $label, @$task);
529 0           next;
530             }
531             else {
532 0           my $ssh = $host->{ssh};
533 0 0 0       unless ($action eq 'parsub' and $task->[0]{no_ssh}) {
534 0 0         unless ($ssh) {
535             # unshift the task we have just removed and connect first:
536 0           unshift @$task, $action;
537 0           unshift @$queue, $task;
538 0 0         if (my $max_conns = $self->{max_conns}) {
539 0 0         $self->_disconnect_any_host() if $self->{num_conns} >= $max_conns;
540             }
541 0 0         $debug and _debug(at => "[$label] host is not connected, connecting...");
542 0           $self->_at_connect($label);
543 0           return;
544             }
545              
546 0 0         if (my $error = $ssh->error) {
547 0           $self->_at_error($label, $error);
548 0           return;
549             }
550             }
551              
552 0           $host->{current_task} = [$action, @$task];
553 0           my %opts = %{shift @$task};
  0            
554 0           delete @opts{@private_opts};
555 0 0         my $method = $self->can("_start_$action")
556             or die "internal error: method _start_$action not found";
557 0           my $pid = $method->($self, $label, \%opts, @$task);
558 0 0         $debug and _debug(action => "[$label] action pid: ", $pid);
559 0 0         unless (defined $pid) {
560 0   0       my $error = (($ssh && $ssh->error) ||
561             dualvar(($action eq 'parsub'
562             ? "Unable to fork parsub"
563             : "Action $action failed to start"), OSSH_SLAVE_FAILED));
564 0           $self->_at_error($label, $error);
565 0           return;
566             }
567 0           $self->{host_by_pid}{$pid} = $label;
568 0           $self->_set_host_state($label, 'running');
569 0           return;
570             }
571             }
572 0 0         $debug and _debug(at => "[$label] at_ready, queue_is_empty, we are done!");
573 0           $self->_set_host_state($label, 'done');
574 0           $self->_disconnect_host($label);
575             }
576              
577             sub _start_parsub {
578 0     0     my $self = shift;
579 0           my $label = shift;
580 0           my $opts = shift;
581 0           my $sub = shift;
582 0           my $ssh = $self->{hosts}{$label}{ssh};
583 0 0         $debug and _debug(action => "[$label] start parsub action [@_]");
584 0           my $pid = fork;
585 0 0         unless ($pid) {
586 0 0         defined $pid or return;
587 0           eval { $sub->($label, $ssh, @_) };
  0            
588 0 0 0       $@ and $debug and _debug(error => "slave died on parsub: $@");
589 0 0         POSIX::_exit($@ ? 1 : 0);
590             }
591 0           $pid;
592             }
593              
594             sub _start_command {
595 0     0     my $self = shift;
596 0           my $label = shift;
597 0           my $opts = shift;
598 0           my $ssh = $self->{hosts}{$label}{ssh};
599 0 0         $debug and _debug(action => "[$label] start command action [@_]");
600 0           $ssh->spawn($opts, @_);
601             }
602              
603             sub _start_scp_get {
604 0     0     my $self = shift;
605 0           my $label = shift;
606 0           my $opts = shift;
607 0           my $ssh = $self->{hosts}{$label}{ssh};
608 0 0         $debug and _debug(action => "[$label] start scp_get action");
609 0           $opts->{async} = 1;
610 0           $ssh->scp_get($opts, @_);
611             }
612              
613             sub _start_scp_put {
614 0     0     my $self = shift;
615 0           my $label = shift;
616 0           my $opts = shift;
617 0           my $ssh = $self->{hosts}{$label}{ssh};
618 0 0         $debug and _debug(action => "[$label] start scp_put action");
619 0           $opts->{async} = 1;
620 0           $ssh->scp_put($opts, @_);
621             }
622              
623             sub _start_rsync_get {
624 0     0     my $self = shift;
625 0           my $label = shift;
626 0           my $opts = shift;
627 0           my $ssh = $self->{hosts}{$label}{ssh};
628 0 0         $debug and _debug(action => "[$label] start rsync_get action");
629 0           $opts->{async} = 1;
630 0           $ssh->rsync_get($opts, @_);
631             }
632              
633             sub _start_rsync_put {
634 0     0     my $self = shift;
635 0           my $label = shift;
636 0           my $opts = shift;
637 0           my $ssh = $self->{hosts}{$label}{ssh};
638 0 0         $debug and _debug(action => "[$label] start rsync_put action");
639 0           $opts->{async} = 1;
640 0           $ssh->rsync_put($opts, @_);
641             }
642              
643             # FIXME: dead code?
644             sub _start_join {
645 0     0     my $self = shift;
646 0           my $label = shift;
647 0           warn "internal mismatch: this shouldn't be happening!";
648             }
649              
650             sub _skip {
651 0     0     my ($self, $label, $target) = @_;
652 0 0         $debug and _debug(action => "skipping until $target");
653 0           my $host = $self->{hosts}{$label};
654 0           my $queue = $host->{queue};
655 0           my ($ix, $task);
656 0           for ($ix = 0; defined($task = $queue->[$ix]); $ix++) {
657 0 0 0       last if ($task->[0] eq 'here' and $task->[2] eq $target);
658             }
659 0 0 0       if ($task or $target eq 'END') {
660 0           for (1..$ix) {
661 0           my $task = shift @$queue;
662 0 0         $self->_join_notify($label, $task->[2])
663             if $task->[0] eq '_notify';
664             }
665 0           return;
666             }
667 0 0         $debug and _debug(action => "here label $target not found");
668 0           $self->_at_error($label, OSSH_GOTO_FAILED);
669             }
670              
671             sub _finish_task {
672 0     0     my ($self, $pid) = @_;
673 0           my $label = delete $self->{host_by_pid}{$pid};
674              
675 0 0         if (defined $label) {
676 0 0         $debug and _debug(action => "[$label] action finished pid: $pid, rc: $?");
677 0           my $host = $self->{hosts}{$label};
678 0           my $or_goto;
679 0 0         if ($?) {
680 0           my ($action) = @{$host->{current_task}};
  0            
681 0           my $rc = ($? >> 8);
682 0 0         my $ssh = $host->{ssh} or die "internal error: $label is not connected";
683 0           my $error = $ssh->error;
684 0 0 0       $or_goto = $host->{current_task}[1]{or_goto} unless ($error or $rc == 255);
685 0 0         if (defined $or_goto) {
686 0 0         $debug and _debug(action => "[$label] skipping to $or_goto with rc = $rc");
687             }
688             else {
689 0   0       $error ||= dualvar(OSSH_SLAVE_FAILED,
690             "child exited with non-zero return code ($rc)");
691 0           $self->_at_error($label, $error);
692             return
693 0           }
694             }
695 0           $self->_set_host_state($label, 'ready');
696 0 0         $self->_skip($label, $or_goto) if defined $or_goto;
697 0           delete $host->{current_task};
698 0           delete $host->{current_task_reconnections};
699             }
700             else {
701 0           my $label = delete $self->{ssh_master_by_pid}{$pid};
702 0 0         defined $label or carp "spurious child exit (pid: $pid)";
703              
704 0 0         $debug and _debug(action => "[$label] master ssh exited");
705 0           my $host = $self->{hosts}{$label};
706 0 0         my $ssh = $host->{ssh}
707             or die ("internal error: master ssh process exited but ".
708             "there is no ssh object associated to host $label");
709 0           $ssh->master_exited;
710 0           my $state = $host->{state};
711             # do error handler later...
712             }
713             }
714              
715             sub _wait_for_jobs {
716 0     0     my ($self, $time) = @_;
717 0           my $dontwait = ($time == 0);
718 0 0         $debug and _debug(at => "_wait_for_jobs time: $time");
719             # This loop is here because we want to call waitpit before and
720             # after the select. If we find some child has exited in the first
721             # round we don't call select at all and return immediately
722 0           while (1) {
723 0 0         if (%{$self->{in_state}{running}}) {
  0            
724 0 0         $debug and _debug(at => "_wait_for_jobs reaping children");
725 0           while (1) {
726 0           my $pid = waitpid(-1, WNOHANG);
727 0 0         last if $pid <= 0;
728 0 0         $debug and _debug(action => "waitpid caught pid: $pid, rc: $?");
729 0           $dontwait = 1;
730 0           $self->_finish_task($pid);
731             }
732             }
733 0 0         $dontwait and return 1;
734 0 0         $debug and _debug(at => "_wait_for_jobs calling select");
735             {
736             # This is a hack to make select finish as soon as we get a
737             # CHLD signal.
738 0     0     local $SIG{CHLD} = sub {};
  0            
  0            
739 0           select(undef, undef, undef, $time);
740             }
741 0           $dontwait = 1;
742             }
743             }
744              
745             sub _clean_errors {
746 0     0     my $self = shift;
747 0           delete $_->{error} for values %{$self->{hosts}};
  0            
748             }
749              
750             sub run {
751 0     0 1   my ($self, $time) = @_;
752              
753 0           $self->_clean_errors;
754              
755 0           my $hosts = $self->{hosts};
756 0           my $max_workers = $self->{max_workers};
757 0           my ($connecting, $ready, $running, $waiting, $suspended, $join_failed, $done) =
758 0           @{$self->{in_state}}{qw(connecting ready running waiting suspended join_failed done)};
759 0           my $connected_suspended = $self->{connected}{suspended};
760 0           while (1) {
761             # use Data::Dumper;
762             # print STDERR Dumper $self;
763 0 0         $debug and _debug(api => "run: iterating...");
764              
765 0 0         $debug and _debug(at => "run: hosts at connecting: ", scalar(keys %$connecting));
766 0           $self->_at_connecting($_) for keys %$connecting;
767              
768 0 0         $debug and _debug(at => "run: hosts at ready: ", scalar(keys %$ready));
769              
770             # $self->_audit_conns;
771 0           $self->_at_ready($_) for keys %$ready;
772             # $self->_audit_conns;
773              
774 0 0         $debug and _debug(at => 'run: hosts at join_failed: ', scalar(keys %$join_failed));
775 0           $self->_at_error($_, OSSH_JOIN_FAILED) for keys %$join_failed;
776              
777 0 0         if ($max_workers) {
778 0 0         $debug and _debug(at => "run: hosts at suspended:", scalar(keys %$suspended));
779 0 0         if (%$suspended) {
780 0           my $awake = $max_workers - $self->_num_workers;
781 0           my @labels;
782 0           for my $hash ($connected_suspended, $suspended) {
783 0           while ($awake > 0) {
784 0 0         my ($label) = each %$hash or last;
785 0           CORE::push @labels, $label;
786 0           $awake--;
787             }
788 0           for my $label (@labels) {
789 0 0         $debug and _debug(workers => "[$label] awaking");
790 0           $self->_set_host_state($label, 'ready');
791             }
792 0           keys %$hash; # do we really need to reset the each iterator?
793             }
794             }
795             }
796              
797 0 0         $debug and _debug(at => "run: hosts at waiting: ", scalar(keys %$waiting));
798 0 0         $debug and _debug(at => "run: hosts at running: ", scalar(keys %$running));
799 0 0         $debug and _debug(at => "run: hosts at done: ", scalar(keys %$done), " of ", scalar(keys %$hosts));
800              
801 0 0         last if keys(%$hosts) == keys(%$done);
802              
803 0 0         my $time = ( %$ready ? 0 :
    0          
804             %$connecting ? 0.3 :
805             5.0 );
806 0           $self->_wait_for_jobs($time);
807             }
808              
809 0           delete $self->{abort_all};
810              
811 0           my $error;
812 0           for my $label (sort keys %$hosts) {
813 0 0         $hosts->{$label}{error} and $error = 1;
814 0 0         $debug and _debug(error => "[$label] error: ", $hosts->{$label}{error});
815             }
816 0           !$error;
817             }
818              
819             sub get_error {
820 0     0 1   my ($self, $label) = @_;
821 0 0         my $host = $self->{hosts}{$label}
822             or croak "no such host $label has been added";
823 0           $host->{error}
824             }
825              
826             sub get_errors {
827 0     0 1   my $self = shift;
828 0 0         if (wantarray) {
829 0           return map {
830 0           my $error = $self->get_error($_);
831 0 0         defined $error ? ($_ => $error) : ()
832 0           } sort keys %{$self->{hosts}}
833             }
834             else {
835 0           return grep defined($self->get_error($_)), keys %{$self->{hosts}}
  0            
836             }
837             }
838              
839             1;
840              
841             __END__