File Coverage

blib/lib/Net/OpenSSH/Parallel.pm
Criterion Covered Total %
statement 24 504 4.7
branch 0 330 0.0
condition 0 64 0.0
subroutine 8 41 19.5
pod 7 7 100.0
total 39 946 4.1


line stmt bran cond sub pod time code
1              
2             package Net::OpenSSH::Parallel;
3              
4             our $VERSION = '0.14';
5              
6 1     1   22095 use strict;
  1         1  
  1         29  
7 1     1   5 use warnings;
  1         2  
  1         32  
8 1     1   4 use Carp qw(croak carp);
  1         6  
  1         89  
9             our @CARP_NOT=qw(Net::OpenSSH);
10              
11 1     1   1630 use Net::OpenSSH;
  1         42666  
  1         46  
12 1     1   676 use Net::OpenSSH::Parallel::Constants qw(:error :on_error);
  1         3  
  1         237  
13              
14 1     1   5 use POSIX qw(WNOHANG);
  1         2  
  1         7  
15 1     1   989 use Time::HiRes qw(time);
  1         2255  
  1         4  
16 1     1   159 use Scalar::Util qw(dualvar);
  1         2  
  1         7022  
17              
18             our $debug;
19              
20             sub new {
21 0     0 1   my ($class, %opts) = @_;
22 0           my $max_workers = delete $opts{workers};
23 0           my $max_conns = delete $opts{connections};
24 0           my $reconnections = delete $opts{reconnections};
25 0           my $on_error = delete $opts{on_error};
26              
27 0 0         if ($max_conns) {
28 0 0         if ($max_workers) {
29 0 0         $max_conns < $max_workers and
30             croak "connections ($max_conns) < workers ($max_workers)";
31             }
32             else {
33 0           $max_workers = $max_conns;
34             }
35             }
36              
37 0 0         %opts and croak "unknonwn option(s): ". join(", ", keys %opts);
38              
39 0           my $self = { joins => {},
40             hosts => {},
41             host_by_pid => {},
42             ssh_master_by_pid => {},
43             in_state => {
44             connecting => {},
45             ready => {},
46             running => {},
47             done => {},
48             waiting => {},
49             suspended => {},
50             join_failed => {},
51             },
52             connected => { suspended => {},
53             waiting => {},
54             join_failed => {},
55             },
56             joins => {},
57             max_workers => $max_workers,
58             max_conns => $max_conns,
59             num_conns => 0,
60             reconnections => $reconnections,
61             on_error => $on_error,
62             };
63 0           bless $self, $class;
64 0           $self;
65             }
66              
67             my %debug_channel = (api => 1, state => 2, select => 4, at => 8,
68             action => 16, join => 32, workers => 64,
69             connect => 128, conns => 256, error => 512);
70              
71             sub _debug {
72 0     0     my $channel = shift;
73 0 0         my $bit = $debug_channel{$channel}
74             or die "internal error: bad debug channel $channel";
75 0 0         if ($debug & $debug_channel{$channel}) {
76             print STDERR sprintf("%6.3f", (time - $^T)), "| ",
77 0 0         join('', map { defined($_) ? $_ : '' } @_), "\n";
  0            
78             }
79             }
80              
81             sub add_host {
82 0     0 1   my $self = shift;
83 0           my $label = shift;
84 0 0         $label =~ /([,*!()<>\/{}])/ and croak "invalid char '$1' in host label";
85 0 0         my %opts = (@_ & 1 ? (host => @_) : @_);
86 0 0         $opts{host} = $label unless defined $opts{host};
87 0 0         $opts{batch_mode} = 1 unless defined $opts{batch_mode};
88              
89 0           my $on_error = delete $opts{on_error};
90 0           my $reconnections = delete $opts{reconnections};
91              
92 0           my $host = { label => $label,
93             workers => 1,
94             opts => \%opts,
95             ssh => undef,
96             state => 'done',
97             queue => [],
98             on_error => $on_error,
99             reconnections => $reconnections,
100             };
101              
102 0           $self->{hosts}{$label} = $host;
103 0 0         $debug and _debug(api => "[$label] added ($host)");
104 0           $self->{in_state}{done}{$label} = 1;
105 0 0         $debug and _debug(state => "[$label] state set to done");
106             }
107              
108             sub _set_host_state {
109 0     0     my ($self, $label, $state) = @_;
110 0           my $host = $self->{hosts}{$label};
111 0           my $old = $host->{state};
112 0 0         delete $self->{in_state}{$old}{$label}
113             or die "internal error: host $label is in state $old but not in such queue";
114 0 0 0       delete $self->{connected}{$old}{$label}
      0        
115             if ($old eq 'suspended' or $old eq 'waiting' or $old eq 'join_failed');
116              
117 0           $self->{in_state}{$state}{$label} = 1;
118 0           $host->{state} = $state;
119 0 0         $debug and _debug(state => "[$label] state changed $old --> $state");
120              
121 0 0 0       if ($host->{ssh} and ($state eq 'suspended' or
      0        
122             $state eq 'waiting' or
123             $state eq 'join_failed')) {
124 0           $self->{connected}{$state}{$label} = 1;
125 0 0         $debug and _debug(state => "[$label] host is connected");
126             }
127             }
128              
129             my %sel2re_cache;
130              
131             sub _selector_to_re {
132 0     0     my ($self, $part) = @_;
133 0   0       $sel2re_cache{$part} ||= do {
134 0           $part = quotemeta $part;
135 0           $part =~ s/\\\*/.*/g;
136 0           qr/^$part$/;
137             }
138             }
139              
140             sub _select_labels {
141 0     0     my ($self, $selector) = @_;
142 0           my %sel;
143 0           my @parts = split /\s*,\s*/, $selector;
144 0           for (@parts) {
145 0           my $re = $self->_selector_to_re($_);
146 0           $sel{$_} = 1 for grep $_ =~ $re, keys %{$self->{hosts}};
  0            
147             }
148 0           my @labels = keys %sel;
149 0 0         $debug and _debug(select => "selector($selector) --> [", join(', ', @labels), "]");
150 0           return @labels;
151             }
152              
153 0     0 1   sub all { shift->push('*', @_) }
154              
155             my %push_action_alias = (get => 'scp_get',
156             put => 'scp_put',
157             psub => 'parsub',
158             cmd => 'command');
159              
160             my %push_min_args = ( here => 1,
161             goto => 1,
162             stop => 0,
163             sub => 1,
164             parsub => 1,
165             scp_get => 2,
166             scp_put => 2,
167             rsync_get => 2,
168             rsync_put => 2 );
169              
170             my %push_max_args = ( here => 1,
171             goto => 1,
172             stop => 0 );
173              
174             sub push {
175 0     0 1   my $self = shift;
176 0           my $selector = shift;
177 0           my $action = shift;
178 0           my $in_state = $self->{in_state};
179              
180 0 0         if (ref $action eq 'CODE') {
181 0           unshift @_, $action;
182 0           $action = 'sub';
183             }
184              
185 0           my $alias = $push_action_alias{$action};
186 0 0         $action = $alias if defined $alias;
187              
188 0 0         $action =~ /^(?:command|(?:(?:rsync|scp)_(?:get|put))|join|sub|parsub|here|stop|goto|_notify|connect)$/
189             or croak "bad action '$action'";
190              
191 0 0 0       my %opts = (($action ne 'sub' and ref $_[0] eq 'HASH') ? %{shift()} : ());
  0            
192 0 0 0       %opts and grep($action eq $_, qw(join here))
193             and croak "unsupported option(s) '" . join("', '", keys %opts) . "' in $action action";
194              
195 0           my @labels = $self->_select_labels($selector);
196              
197 0           my $max = $push_max_args{$action};
198 0 0 0       croak "too many parameters for action $action"
199             if (defined $max and $max < @_);
200              
201 0           my $min = $push_min_args{$action};
202 0 0 0       croak "too few parameters for action $action"
203             if (defined $min and $min > @_);
204              
205 0 0         if ($action eq 'join') {
206 0           my $notify_selector = shift @_;
207 0           my $join = { id => '#' . $self->{join_seq}++,
208             depends => {},
209             notify => [] };
210             my @depends = $self->push($notify_selector, _notify => {}, $join)
211 0 0         or do {
212 0           $join->_debug(join => "join $join->{id} does not depend on anything, ignoring!");
213 0           return ();
214             };
215 0           $join->{depends}{$_} = 1 for @depends;
216              
217 0           for my $label (@labels) {
218 0           my $host = $self->{hosts}{$label};
219 0           push @{$host->{queue}}, [join => {}, $join];
  0            
220 0 0         $debug and _debug(api => "[$label] join $join->{id} queued");
221             $self->_set_host_state($label, 'ready')
222 0 0         if $in_state->{done}{$label};
223             }
224             }
225             else {
226 0           for my $label (@labels) {
227 0           my $host = $self->{hosts}{$label};
228 0           push @{$host->{queue}}, [$action, \%opts, @_];
  0            
229 0 0         $debug and _debug(api => "[$label] action $action queued");
230             $self->_set_host_state($label, 'ready')
231 0 0         if $in_state->{done}{$label};
232             }
233             }
234 0           @labels;
235             }
236              
237             sub _audit_conns {
238 0     0     my $self = shift;
239 0           my $hosts = $self->{hosts};
240 0           my $num = 0;
241 0           $num++ for grep $_->{ssh}, values %$hosts;
242 0 0         $debug and _debug(conns => "audit_conns counted: $num, saved: $self->{num_conns}");
243             $num == $self->{num_conns}
244 0 0         or die "internal error: wrong number of connections, counted: $num, saved: $self->{num_conns}";
245 0           my $in_state = $self->{in_state};
246 0           for my $state (keys %$in_state) {
247 0           my $num = 0;
248 0           $num++ for grep $hosts->{$_}{ssh}, keys %{$in_state->{$state}};
  0            
249 0           my $total = keys %{$in_state->{$state}};
  0            
250 0           print STDERR "conns in state $state: $num of $total\n";
251             }
252             }
253              
254             sub _hash_chain_get {
255 0     0     my $name = shift;
256 0           for (@_) {
257 0 0         if (defined $_) {
258 0           my $v = $_->{$name};
259 0 0         return $v if defined $v;
260             }
261             }
262 0           undef;
263             }
264              
265             sub _at_error {
266 0     0     my ($self, $label, $error) = @_;
267 0           my $host = $self->{hosts}{$label};
268 0           my $task = delete $host->{current_task};
269 0           my $queue = $host->{queue};
270              
271 0 0         $debug and _debug(error => "_at_error label: $label, error: $error");
272              
273 0           my $opts;
274 0 0         $opts = $task->[1] if $task;
275              
276 0           my $on_error;
277 0 0         if ($error == OSSH_MASTER_FAILED) {
278 0 0         if ($host->{state} eq 'connecting') {
279             # task is not set in state connecting!
280 0 0         $task and die "internal error: task is defined in state connecting";
281 0 0         $opts = $queue->[0][1] if @$queue;
282             }
283 0   0       my $max_reconnections = _hash_chain_get(reconnections => $opts, $host, $self) || 0;
284 0   0       my $reconnections = $host->{current_task_reconnections}++ || 0;
285 0 0         $debug and _debug(error => "[$label] reconnection: $reconnections, max: $max_reconnections");
286 0 0         if ($reconnections < $max_reconnections) {
287 0 0         $debug and _debug(error => "[$label] will reconnect!");
288 0           $on_error = OSSH_ON_ERROR_RETRY;
289             }
290             }
291 0   0       $on_error ||= _hash_chain_get(on_error => $opts, $host, $self);
292              
293 0 0         if (ref $on_error eq 'CODE') {
294 0 0         if ($error == OSSH_JOIN_FAILED) {
295 0           $on_error = $on_error->($self, $label, $error);
296             }
297             else {
298 0           $on_error = $on_error->($self, $label, $error, $task);
299             }
300             }
301              
302 0 0 0       $on_error = OSSH_ON_ERROR_ABORT if (not defined $on_error or
303             $error == OSSH_ABORTED);
304              
305 0 0         $debug and _debug(error => "[$label] on_error (final): $on_error, error: $error (".($error+0).")");
306              
307 0 0         if ($on_error == OSSH_ON_ERROR_RETRY) {
308 0 0         if ($error == OSSH_MASTER_FAILED) {
    0          
309 0           $self->_set_host_state($label, 'suspended');
310 0           $self->_disconnect_host($label);
311 0           $self->_set_host_state($label, 'ready');
312             }
313             elsif ($error == OSSH_GOTO_FAILED) {
314             # No way to retry after a GOTO error!
315             # That should probably croak, but that would leave unmanaged
316             # processes running
317 0           $on_error = OSSH_ON_ERROR_ABORT;
318             }
319             else {
320 0           unshift @$queue, $task;
321             }
322 0           return;
323             }
324              
325 0           delete $host->{current_task_reconnections};
326              
327 0 0         if ($on_error == OSSH_ON_ERROR_IGNORE) {
328 0 0         if ($error == OSSH_MASTER_FAILED) {
329             # establishing a new connection failed, what we should do?
330             # currently we remove the current task from the queue and
331             # continue.
332 0 0         shift @$queue unless $task;
333 0           $self->_set_host_state($label, 'suspended');
334 0           $self->_disconnect_host($label);
335 0           $self->_set_host_state($label, 'ready');
336             }
337             else {
338 0           $self->_set_host_state($label, 'ready');
339             }
340             # else do nothing
341             }
342             else {
343 0 0 0       unless ($on_error == OSSH_ON_ERROR_DONE or
      0        
344             $on_error == OSSH_ON_ERROR_ABORT or
345             $on_error == OSSH_ON_ERROR_ABORT_ALL) {
346 0           carp "bad on_error code $on_error";
347 0           $on_error = OSSH_ON_ERROR_ABORT;
348             }
349 0           my $queue = $host->{queue};
350 0           my $failed = ($on_error != OSSH_ON_ERROR_DONE);
351 0 0         $debug and _debug(error => "[$label] dropping queue, ", scalar(@$queue), " jobs");
352 0           while (my $task = shift @$queue) {
353 0           my ($action, undef, $join) = @$task;
354 0 0         $debug and _debug(error => "[$label] remove action $action from queue");
355 0 0         $self->_join_notify($label, $join, $failed)
356             if $action eq '_notify';
357             }
358              
359             $on_error == OSSH_ON_ERROR_ABORT_ALL
360 0 0         and $self->{abort_all} = 1;
361              
362 0           $self->_set_host_state($label, 'done');
363 0           $self->_disconnect_host($label);
364 0           $host->{error} = $error;
365             }
366             }
367              
368             sub _at_connect {
369 0     0     my ($self, $label) = @_;
370 0           my $host = $self->{hosts}{$label};
371 0 0         $debug and _debug(connect => "[$label] _connect, starting SSH connection");
372 0 0         $host->{ssh} and die "internal error: connecting host is already connected";
373             my $ssh = $host->{ssh} = Net::OpenSSH->new(expand_vars => 1,
374 0           %{$host->{opts}},
  0            
375             async => 1);
376 0           $ssh->set_var(LABEL => $label);
377 0           my $master_pid = $ssh->get_master_pid;
378 0           $host->{master_pid} = $master_pid;
379 0           $self->{ssh_master_by_pid}{$master_pid} = $label;
380 0           $self->{num_conns}++;
381 0           $self->_set_host_state($label, 'connecting');
382 0 0         if ($ssh->error) {
383 0           $self->_at_error($label, $ssh->error);
384             }
385             }
386              
387             sub _at_connecting {
388 0     0     my ($self, $label) = @_;
389 0           my $host = $self->{hosts}{$label};
390 0 0         $debug and _debug(at => "[$label] at_connecting, waiting for master");
391 0           my $ssh = $host->{ssh};
392 0 0         if ($ssh->wait_for_master(1)) {
    0          
393 0 0         $debug and _debug(at => "[$label] at_connecting, master connected");
394 0           $self->_set_host_state($label, 'ready');
395             }
396             elsif ($ssh->error) {
397 0           $self->_at_error($label, $ssh->error);
398             }
399             }
400              
401             sub _join_notify {
402 0     0     my ($self, $label, $join, $failed) = @_;
403             # use Data::Dumper;
404             # print STDERR Dumper $join;
405 0 0         delete $join->{depends}{$label}
406             or die "internal error: $join->{id} notified for non dependent label $label";
407 0 0         $debug and _debug(join => "removing dependent $label from join $join->{id}");
408 0 0         $join->{failed} = 1 if $failed;
409 0 0         if (not %{$join->{depends}}) {
  0            
410 0 0         $debug and _debug(join => "join $join->{id} done");
411 0           $join->{done} = 1;
412 0           my $failed = $join->{failed};
413 0           for my $label (@{$join->{notify}}) {
  0            
414 0 0         $debug and _debug(join => "notifying $label about join $join->{id} done");
415 0 0         $self->_set_host_state($label, $failed ? 'join_failed' : 'ready');
416             }
417             }
418             # print STDERR Dumper $join;
419             }
420              
421             sub _num_workers {
422 0     0     my $in_state = shift->{in_state};
423 0           ( keys(%{$in_state->{ready}}) +
424 0           keys(%{$in_state->{running}}) +
425 0           keys(%{$in_state->{connecting}}) );
  0            
426             }
427              
428             sub _disconnect_host {
429 0     0     my ($self, $label) = @_;
430 0           my $host = $self->{hosts}{$label};
431 0           my $state = $host->{state};
432 0 0         $state =~ /^(?:waiting|suspended|done|connecting)$/
433             or die "internal error: disconnecting $label in state $state";
434 0 0         if ($host->{ssh}) {
435 0 0         $debug and _debug(connect => "[$label] disconnecting host");
436 0           my $master_pid = delete $host->{master_pid};
437 0 0         delete $self->{ssh_master_by_pid}{$master_pid}
438             if defined $master_pid;
439 0           undef $host->{ssh};
440 0           $self->{num_conns}--;
441 0           $self->_set_host_state($label, $state);
442             }
443             }
444              
445             sub _disconnect_any_host {
446 0     0     my $self = shift;
447 0           my $connected = $self->{connected};
448 0 0         $debug and _debug(conns => "disconnect any host");
449             # $self->_audit_conns;
450 0           my $label;
451 0           for my $state (qw(suspended join_failed waiting)) {
452             # use Data::Dumper;
453             # print Dumper $connected;
454 0 0         $debug and _debug(conns => "looking for connected host in state $state");
455 0           ($label) = each %{$connected->{$state}};
  0            
456 0           keys %{$connected->{$state}}; # reset iterator
  0            
457 0 0         last if defined $label;
458             }
459 0 0         $debug and _debug(conns => "[$label] disconnecting");
460 0 0         defined $label or die "internal error: unable to disconnect any host";
461 0           $self->_disconnect_host($label);
462             }
463              
464             my @private_opts = qw(on_error or_goto reconnections);
465              
466             sub _at_ready {
467 0     0     my ($self, $label) = @_;
468 0 0         if (my $max_workers = $self->{max_workers}) {
469 0           my $in_state = $self->{in_state};
470 0           my $num_workers = $self->_num_workers;
471 0 0         $debug and _debug(workers => "num workers: $num_workers, maximum: $max_workers");
472 0 0         if ($num_workers > $max_workers) {
473 0 0         $debug and _debug(workers => "[$label] suspending");
474 0           $self->_set_host_state($label, 'suspended');
475 0           return;
476             }
477             }
478              
479 0           my $host = $self->{hosts}{$label};
480 0 0         $debug and _debug(at => "[$label] at_ready");
481              
482 0           my $queue = $host->{queue};
483              
484 0 0         if ($self->{abort_all}) {
485 0           $self->_at_error($label, OSSH_ABORTED);
486 0           return;
487             }
488              
489 0           while (defined (my $task = shift @$queue)) {
490 0           my $action = shift @$task;
491 0 0         $debug and _debug(at => "[$label] at_ready, starting new action $action");
492 0 0         if ($action eq 'join') {
    0          
    0          
    0          
    0          
    0          
493 0           my (undef, $join) = @$task;
494 0 0         if ($join->{done}) {
495 0 0         $debug and _debug(join => "join[$join->{id}] is done");
496 0 0         if ($join->{failed}) {
497 0           $self->_at_error($label, OSSH_JOIN_FAILED);
498 0           return;
499             }
500 0 0         $debug and _debug(action => "[$label] join $join->{id} already done");
501 0           next;
502             }
503 0           CORE::push @{$join->{notify}}, $label;
  0            
504 0           $self->_set_host_state($label, 'waiting');
505 0           return;
506             }
507             elsif ($action eq 'here') {
508 0           next;
509             }
510             elsif ($action eq 'stop') {
511 0           $self->_skip($label, 'END');
512 0           next;
513             }
514             elsif ($action eq 'goto') {
515 0           my (undef, $target) = @$task;
516 0           $self->_skip($label, $target);
517 0           next;
518             }
519             elsif ($action eq '_notify') {
520 0           my (undef, $join) = @$task;
521 0           $self->_join_notify($label, $join);
522 0           next;
523             }
524             elsif ($action eq 'sub') {
525             # use Data::Dumper;
526             # _debug (action => Dumper(@$task));
527 0           shift @$task;
528 0           my $sub = shift @$task;
529 0 0         $debug and _debug(action => "[$label] calling sub $sub");
530 0           $sub->($self, $label, @$task);
531 0           next;
532             }
533             else {
534 0           my $ssh = $host->{ssh};
535 0 0 0       unless ($action eq 'parsub' and $task->[0]{no_ssh}) {
536 0 0         unless ($ssh) {
537             # unshift the task we have just removed and connect first:
538 0           unshift @$task, $action;
539 0           unshift @$queue, $task;
540 0 0         if (my $max_conns = $self->{max_conns}) {
541 0 0         $self->_disconnect_any_host() if $self->{num_conns} >= $max_conns;
542             }
543 0 0         $debug and _debug(at => "[$label] host is not connected, connecting...");
544 0           $self->_at_connect($label);
545 0           return;
546             }
547              
548 0 0         if (my $error = $ssh->error) {
549 0           $self->_at_error($label, $error);
550 0           return;
551             }
552             }
553              
554 0 0         next if $action eq 'connect';
555              
556 0           $host->{current_task} = [$action, @$task];
557 0           my %opts = %{shift @$task};
  0            
558 0           delete @opts{@private_opts};
559 0 0         my $method = $self->can("_start_$action")
560             or die "internal error: method _start_$action not found";
561 0           my $pid = $method->($self, $label, \%opts, @$task);
562 0 0         $debug and _debug(action => "[$label] action pid: ", $pid);
563 0 0         unless (defined $pid) {
564 0   0       my $error = (($ssh && $ssh->error) ||
565             dualvar(($action eq 'parsub'
566             ? "Unable to fork parsub"
567             : "Action $action failed to start"), OSSH_SLAVE_FAILED));
568 0           $self->_at_error($label, $error);
569 0           return;
570             }
571 0           $self->{host_by_pid}{$pid} = $label;
572 0           $self->_set_host_state($label, 'running');
573 0           return;
574             }
575             }
576 0 0         $debug and _debug(at => "[$label] at_ready, queue_is_empty, we are done!");
577 0           $self->_set_host_state($label, 'done');
578 0           $self->_disconnect_host($label);
579             }
580              
581             sub _start_parsub {
582 0     0     my $self = shift;
583 0           my $label = shift;
584 0           my $opts = shift;
585 0           my $sub = shift;
586 0           my $ssh = $self->{hosts}{$label}{ssh};
587 0 0         $debug and _debug(action => "[$label] start parsub action [@_]");
588 0           my $pid = fork;
589 0 0         unless ($pid) {
590 0 0         defined $pid or return;
591 0           eval { $sub->($label, $ssh, @_) };
  0            
592 0 0 0       $@ and $debug and _debug(error => "slave died on parsub: $@");
593 0 0         POSIX::_exit($@ ? 1 : 0);
594             }
595 0           $pid;
596             }
597              
598             sub _start_command {
599 0     0     my $self = shift;
600 0           my $label = shift;
601 0           my $opts = shift;
602 0           my $ssh = $self->{hosts}{$label}{ssh};
603 0 0         $debug and _debug(action => "[$label] start command action [@_]");
604 0           $ssh->spawn($opts, @_);
605             }
606              
607             sub _start_scp_get {
608 0     0     my $self = shift;
609 0           my $label = shift;
610 0           my $opts = shift;
611 0           my $ssh = $self->{hosts}{$label}{ssh};
612 0 0         $debug and _debug(action => "[$label] start scp_get action");
613 0           $opts->{async} = 1;
614 0           $ssh->scp_get($opts, @_);
615             }
616              
617             sub _start_scp_put {
618 0     0     my $self = shift;
619 0           my $label = shift;
620 0           my $opts = shift;
621 0           my $ssh = $self->{hosts}{$label}{ssh};
622 0 0         $debug and _debug(action => "[$label] start scp_put action");
623 0           $opts->{async} = 1;
624 0           $ssh->scp_put($opts, @_);
625             }
626              
627             sub _start_rsync_get {
628 0     0     my $self = shift;
629 0           my $label = shift;
630 0           my $opts = shift;
631 0           my $ssh = $self->{hosts}{$label}{ssh};
632 0 0         $debug and _debug(action => "[$label] start rsync_get action");
633 0           $opts->{async} = 1;
634 0           $ssh->rsync_get($opts, @_);
635             }
636              
637             sub _start_rsync_put {
638 0     0     my $self = shift;
639 0           my $label = shift;
640 0           my $opts = shift;
641 0           my $ssh = $self->{hosts}{$label}{ssh};
642 0 0         $debug and _debug(action => "[$label] start rsync_put action");
643 0           $opts->{async} = 1;
644 0           $ssh->rsync_put($opts, @_);
645             }
646              
647             # FIXME: dead code?
648             sub _start_join {
649 0     0     my $self = shift;
650 0           my $label = shift;
651 0           warn "internal mismatch: this shouldn't be happening!";
652             }
653              
654             sub _skip {
655 0     0     my ($self, $label, $target) = @_;
656 0 0         $debug and _debug(action => "skipping until $target");
657 0           my $host = $self->{hosts}{$label};
658 0           my $queue = $host->{queue};
659 0           my ($ix, $task);
660 0           for ($ix = 0; defined($task = $queue->[$ix]); $ix++) {
661 0 0 0       last if ($task->[0] eq 'here' and $task->[2] eq $target);
662             }
663 0 0 0       if ($task or $target eq 'END') {
664 0           for (1..$ix) {
665 0           my $task = shift @$queue;
666 0 0         $self->_join_notify($label, $task->[2])
667             if $task->[0] eq '_notify';
668             }
669 0           return;
670             }
671 0 0         $debug and _debug(action => "here label $target not found");
672 0           $self->_at_error($label, OSSH_GOTO_FAILED);
673             }
674              
675             sub _finish_task {
676 0     0     my ($self, $pid) = @_;
677 0           my $label = delete $self->{host_by_pid}{$pid};
678              
679 0 0         if (defined $label) {
680 0 0         $debug and _debug(action => "[$label] action finished pid: $pid, rc: $?");
681 0           my $host = $self->{hosts}{$label};
682 0           my $or_goto;
683 0 0         if ($?) {
684 0           my ($action) = @{$host->{current_task}};
  0            
685 0           my $rc = ($? >> 8);
686 0 0         my $ssh = $host->{ssh} or die "internal error: $label is not connected";
687 0           my $error = $ssh->error;
688 0 0 0       $or_goto = $host->{current_task}[1]{or_goto} unless ($error or $rc == 255);
689 0 0         if (defined $or_goto) {
690 0 0         $debug and _debug(action => "[$label] skipping to $or_goto with rc = $rc");
691             }
692             else {
693 0   0       $error ||= dualvar(OSSH_SLAVE_FAILED,
694             "child exited with non-zero return code ($rc)");
695 0           $self->_at_error($label, $error);
696             return
697 0           }
698             }
699 0           $self->_set_host_state($label, 'ready');
700 0 0         $self->_skip($label, $or_goto) if defined $or_goto;
701 0           delete $host->{current_task};
702 0           delete $host->{current_task_reconnections};
703             }
704             else {
705 0           my $label = delete $self->{ssh_master_by_pid}{$pid};
706 0 0         defined $label or carp "spurious child exit (pid: $pid)";
707              
708 0 0         $debug and _debug(action => "[$label] master ssh exited");
709 0           my $host = $self->{hosts}{$label};
710             my $ssh = $host->{ssh}
711 0 0         or die ("internal error: master ssh process exited but ".
712             "there is no ssh object associated to host $label");
713 0           $ssh->master_exited;
714 0           my $state = $host->{state};
715             # do error handler later...
716             }
717             }
718              
719             sub _wait_for_jobs {
720 0     0     my ($self, $time) = @_;
721 0           my $dontwait = ($time == 0);
722 0 0         $debug and _debug(at => "_wait_for_jobs time: $time");
723             # This loop is here because we want to call waitpit before and
724             # after the select. If we find some child has exited in the first
725             # round we don't call select at all and return immediately
726 0           while (1) {
727 0 0         if (%{$self->{in_state}{running}}) {
  0            
728 0 0         $debug and _debug(at => "_wait_for_jobs reaping children");
729 0           while (1) {
730 0           my $pid = waitpid(-1, WNOHANG);
731 0 0         last if $pid <= 0;
732 0 0         $debug and _debug(action => "waitpid caught pid: $pid, rc: $?");
733 0           $dontwait = 1;
734 0           $self->_finish_task($pid);
735             }
736             }
737 0 0         $dontwait and return 1;
738 0 0         $debug and _debug(at => "_wait_for_jobs calling select");
739             {
740             # This is a hack to make select finish as soon as we get a
741             # CHLD signal.
742 0     0     local $SIG{CHLD} = sub {};
  0            
743 0           select(undef, undef, undef, $time);
744             }
745 0           $dontwait = 1;
746             }
747             }
748              
749             sub _clean_errors {
750 0     0     my $self = shift;
751 0           delete $_->{error} for values %{$self->{hosts}};
  0            
752             }
753              
754             sub run {
755 0     0 1   my ($self, $time) = @_;
756              
757 0           $self->_clean_errors;
758              
759 0           my $hosts = $self->{hosts};
760 0           my $max_workers = $self->{max_workers};
761             my ($connecting, $ready, $running, $waiting, $suspended, $join_failed, $done) =
762 0           @{$self->{in_state}}{qw(connecting ready running waiting suspended join_failed done)};
  0            
763 0           my $connected_suspended = $self->{connected}{suspended};
764 0           while (1) {
765             # use Data::Dumper;
766             # print STDERR Dumper $self;
767 0 0         $debug and _debug(api => "run: iterating...");
768              
769 0 0         $debug and _debug(at => "run: hosts at connecting: ", scalar(keys %$connecting));
770 0           $self->_at_connecting($_) for keys %$connecting;
771              
772 0 0         $debug and _debug(at => "run: hosts at ready: ", scalar(keys %$ready));
773              
774             # $self->_audit_conns;
775 0           $self->_at_ready($_) for keys %$ready;
776             # $self->_audit_conns;
777              
778 0 0         $debug and _debug(at => 'run: hosts at join_failed: ', scalar(keys %$join_failed));
779 0           $self->_at_error($_, OSSH_JOIN_FAILED) for keys %$join_failed;
780              
781 0 0         if ($max_workers) {
782 0 0         $debug and _debug(at => "run: hosts at suspended:", scalar(keys %$suspended));
783 0 0         if (%$suspended) {
784 0           my $awake = $max_workers - $self->_num_workers;
785 0           my @labels;
786 0           for my $hash ($connected_suspended, $suspended) {
787 0           while ($awake > 0) {
788 0 0         my ($label) = each %$hash or last;
789 0           CORE::push @labels, $label;
790 0           $awake--;
791             }
792 0           for my $label (@labels) {
793 0 0         $debug and _debug(workers => "[$label] awaking");
794 0           $self->_set_host_state($label, 'ready');
795             }
796 0           keys %$hash; # do we really need to reset the each iterator?
797             }
798             }
799             }
800              
801 0 0         $debug and _debug(at => "run: hosts at waiting: ", scalar(keys %$waiting));
802 0 0         $debug and _debug(at => "run: hosts at running: ", scalar(keys %$running));
803 0 0         $debug and _debug(at => "run: hosts at done: ", scalar(keys %$done), " of ", scalar(keys %$hosts));
804              
805 0 0         last if keys(%$hosts) == keys(%$done);
806              
807 0 0         my $time = ( %$ready ? 0 :
    0          
808             %$connecting ? 0.3 :
809             5.0 );
810 0           $self->_wait_for_jobs($time);
811             }
812              
813 0           delete $self->{abort_all};
814              
815 0           my $error;
816 0           for my $label (sort keys %$hosts) {
817 0 0         $hosts->{$label}{error} and $error = 1;
818 0 0         $debug and _debug(error => "[$label] error: ", $hosts->{$label}{error});
819             }
820 0           !$error;
821             }
822              
823             sub get_error {
824 0     0 1   my ($self, $label) = @_;
825 0 0         my $host = $self->{hosts}{$label}
826             or croak "no such host $label has been added";
827             $host->{error}
828 0           }
829              
830             sub get_errors {
831 0     0 1   my $self = shift;
832 0 0         if (wantarray) {
833             return map {
834 0           my $error = $self->get_error($_);
835 0 0         defined $error ? ($_ => $error) : ()
836 0           } sort keys %{$self->{hosts}}
  0            
837             }
838             else {
839 0           return grep defined($self->get_error($_)), keys %{$self->{hosts}}
  0            
840             }
841             }
842              
843             1;
844              
845             __END__