File Coverage

blib/lib/RPC/ToWorker.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1              
2             package RPC::ToWorker;
3              
4 1     1   247587 use strict;
  1         3  
  1         89  
5 1     1   6 use warnings;
  1         2  
  1         59  
6             require Exporter;
7 1     1   1002 use File::Slurp::Remote::BrokenDNS qw($myfqdn %fqdnify);
  1         12370  
  1         158  
8 1     1   9 use Tie::Function::Examples qw(%q_perl);
  1         2  
  1         86  
9 1     1   1701 use IO::Event;
  0            
  0            
10             use IO::Event::Callback;
11             use Eval::LineNumbers qw(eval_line_numbers);
12             use Carp qw(confess);
13             use IO::Handle;
14             use Socket;
15             use IO::Event::Callback;
16             use Proc::Parallel::RemoteKiller;
17             use Scalar::Util qw(refaddr weaken);
18             use Time::HiRes qw(time);
19             require POSIX;
20              
21             our $VERSION = 0.601;
22              
23             our @EXPORT = qw(do_remote_job);
24             our @ISA = qw(Exporter);
25              
26             our $command = 'perl';
27              
28             our $max_retry = 10;
29              
30             my $timer_interval = 5;
31             my $reconnect_timeout = 7200;
32             my $listen_port = 28328;
33             my $listener;
34             my $poll_interval = 15;
35              
36             our $remote_killer;
37              
38             my %waiting;
39              
40             our $debug = 0;
41             our $debug_create = 0;
42              
43             my %forced_polling;
44             my $last_poll = 0;
45             our $doing_force_poll = 1;
46              
47             sub force_poll
48             {
49             # work around a bug in IO::Event or maybe Event
50             return if $doing_force_poll;
51             return $last_poll + $poll_interval < time;
52             local $doing_force_poll = 1;
53             $last_poll = time;
54              
55             print STDERR "--------------------------------- forced poll start ----------------------------\n";
56              
57             for my $a (keys %forced_polling) {
58             my $ioe = $forced_polling{$a};
59             if ($ioe) {
60             $ioe->ie_input();
61             } else {
62             delete $forced_polling{$a};
63             }
64             }
65              
66             print STDERR "--------------------------------- forced poll end ------------------------------\n";
67             }
68              
69             sub do_remote_job
70             {
71             my (%params) = @_;
72              
73             $params{can_retry} = 1 unless defined $params{can_retry};
74             my $can_retry = $params{can_retry};
75             my $host = $params{host};
76             my $when_done = $params{when_done} || confess "when_done is a required parameter";
77             my $data = $params{data};
78             my $chdir = $params{chdir} ||= '.';
79             my $eval = $params{eval};
80             my $desc = $params{desc} ||= "job on $host";
81             my $prefix = $params{prefix} ||= "$host:";
82             my $preload = $params{preload} ||= [];
83             my $prequel = $params{prequel} ||= '';
84             my $alldone = bless { %params }, 'RPC::ToWorker::AllDone';
85             my $status = $params{status} ||= sub { 0; };
86             $params{failure} ||= sub {
87             print STDERR "DIE DIE DIE DIE DIE: $desc: @_";
88             # exit 1; hangs!
89             POSIX::_exit(1);
90             };
91             my $died_at = $params{died_at} ||= $params{failure};
92              
93             $params{alldone} = $alldone;
94              
95             $preload = [ split(' ', $preload) ] unless ref $preload;
96              
97             while(! $listener) {
98             $listener = IO::Event::Socket::INET->new(
99             Listen => 100,
100             Proto => 'tcp',
101             LocalPort => ++$listen_port,
102             );
103             unless ($listener) {
104             warn "# Cannot listen on port $listen_port: $!";
105             redo;
106             }
107             my $timer = IO::Event->timer(
108             interval => $timer_interval,
109             cb => sub {
110             for my $e (keys %waiting) {
111             my $r = $waiting{$e};
112             next if time < $r->{start_time} + $reconnect_timeout;
113             next if $r->{alldone}{failed};
114             if ($r->{alldone}{compile_finished} && $can_retry && ! $r->{alldone}{master_go} && $can_retry < $max_retry) {
115             $r->{alldone}->{retrying} = 1;
116             $r->{alldone}{failed} = "Timed out, retrying";
117             my %new = %$r;
118             delete $new{alldone};
119             $new{can_retry}++;
120             $new{desc} = "RETRY$new{can_retry} $new{desc}";
121             do_remote_job(%new);
122             print STDERR "RETRYING REMOTE JOB $desc\n";
123             } else {
124             $r->{failure}->("Timed out waiting for job $desc on $host to connect to $listen_port for cookie $e");
125             }
126             }
127             RPC::ToWorker::force_poll();
128             },
129             );
130              
131             # $listener->event->prio(1);
132              
133             $remote_killer = Proc::Parallel::RemoteKiller->new();
134             }
135              
136             my $slavefh = new IO::Handle;
137             my $parentfh = new IO::Handle;
138              
139             socketpair($slavefh, $parentfh, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
140             or die "socketpair: $!";
141             my $pid = fork();
142             my $slave;
143             if ($pid) {
144             # parent
145             $parentfh->close();
146             $slavefh->blocking(0);
147             $slavefh->autoflush(1);
148             $slave = IO::Event::Callback->new($slavefh,
149             werror => sub {
150             $params{failure}->("Could not write to stdin for $desc: $!");
151             $alldone->{failed}->("Could not write to stdin for $desc: $!");
152             },
153             input => sub {
154             my ($self, $ioe) = @_;
155             while (<$ioe>) {
156             if (/^SLAVE PID=(\d+)\n/) {
157             $remote_killer->note($host, $1);
158             $params{pid} = $1;
159             $alldone->{slavepid} = $1;
160             next;
161             } elsif (/^compile finished\./) {
162             $alldone->{compile_finished} = 1;
163             next;
164             }
165             if ($params{output_handler}) {
166             $params{output_handler}->($_);
167             } else {
168             print STDERR "$prefix SSH/ERROR: $_";
169             }
170             }
171             RPC::ToWorker::force_poll();
172             },
173             eof => sub {
174             my ($self, $ioe) = @_;
175             $ioe->close();
176             $params{output_handler}->("EOF on ssh ($desc)\n") if $debug_create;
177             },
178             );
179             $params{output_handler}->("startup ($desc)\n") if $debug_create;
180             } elsif (defined $pid) {
181             # child
182             $slavefh->close();
183             $parentfh->autoflush(1);
184             $parentfh->blocking(0);
185             print $parentfh "Foo!\n" if $debug;
186             open STDIN, "<&", \$parentfh or die "dup onto STDIN: $!";
187             open STDOUT, ">&", \$parentfh or die "dup onto STDOUT: $!";
188             open STDERR, ">&", \$parentfh or die "dup onto STDERR: $!";
189             if (0 && $fqdnify{$host} eq $myfqdn) { # XXX why is this not reliable?
190             exec $command
191             or die "exec $command: $!";
192             } else {
193             exec 'ssh', $host, '-o', 'StrictHostKeyChecking=no', '-o', 'BatchMode=yes', $command,
194             or do {
195             $params{failure}->("exec ssh $host $command: $!");
196             return;
197             };
198             }
199             } else {
200             die "cannot fork: $!";
201             }
202              
203             my $cookie;
204             do {
205             $cookie = "C".rand(100000000);
206             } while defined $waiting{"$cookie MASTER"};
207              
208             $waiting{"$cookie MASTER"} = bless {
209             slave => $slave,
210             start_time => time,
211             %params,
212             }, 'RPC::ToWorker::Master';
213              
214             $waiting{"$cookie OUTPUT"} = bless {
215             slave => $slave,
216             start_time => time,
217             %params,
218             }, 'RPC::ToWorker::Output';
219              
220             my $stream = '';
221              
222             if ($params{stream_in} || $params{stream_out}) {
223             $stream = eval_line_numbers(<
224             my \$stream = new IO::Socket::INET (
225             PeerAddr => '$myfqdn:$listen_port',
226             Proto => 'tcp',
227             );
228             die "Could not connect to master at $myfqdn:$listen_port: \$!" unless \$stream;
229             \$stream->autoflush(1);
230             print \$stream "$cookie STREAM\\n"
231             or die;
232             END_STREAM
233              
234             $waiting{"$cookie STREAM"} = bless {
235             slave => $slave,
236             start_time => time,
237             %params,
238             }, 'RPC::ToWorker::Stream';
239             }
240              
241             my $pre = '';
242             $pre .= "use $_; " for @$preload;
243              
244             my $p5lib = $ENV{PERL5LIB} || '';
245              
246             my $av0 = "slave for $myfqdn:$$ - $desc: ";
247              
248             print $slave eval_line_numbers(<
249             \$0 = $q_perl{$av0} . 'starting';
250             use strict;
251             use warnings;
252             BEGIN {
253             print "SLAVE PID=\$\$\\n";
254             chdir($q_perl{$chdir}) or die "cannot chdir to $chdir on $host: \$!";
255             unshift(\@INC, split(':', $q_perl{$p5lib}));
256             }
257             END_SLAVE0
258              
259             print $slave $prequel;
260             print $slave "\n";
261             print $slave eval_line_numbers(<
262             use IO::Socket::INET;
263             use Storable qw(freeze thaw);
264             $pre
265             END_SLAVE1
266             print $slave eval_line_numbers(<
267              
268             if ($debug) {
269             open(DEBUG, ">&STDERR") or die "dup STDERR: $!";
270             print STDERR "Dup to DEBUG should have worked\\n";
271             select(DEBUG);
272             \$| = 1;
273             select(STDOUT);
274             printf DEBUG "debug test %d\\n", __LINE__;
275             }
276              
277             \$0 = $q_perl{$av0} . 'redirecting STDOUT';
278              
279             my \$output = new IO::Socket::INET (
280             PeerAddr => '$myfqdn:$listen_port',
281             Proto => 'tcp',
282             );
283             die "Could not connect to master at $myfqdn:$listen_port: \$!" unless \$output;
284             \$output->autoflush(1);
285             print \$output "$cookie OUTPUT\\n";
286              
287             if ($debug) {
288             printf DEBUG "debug test %d\\n", __LINE__;
289             print STDERR "Connected for Output\\n";
290             printf DEBUG "debug test %d\\n", __LINE__;
291             print "Output connected\\n";
292             printf DEBUG "debug test %d\\n", __LINE__;
293             print \$output "test foo\\n";
294             printf DEBUG "debug test %d\\n", __LINE__;
295             }
296              
297             open STDOUT, ">&", \$output or die "dup to STDOUT: \$!";
298             select STDOUT;
299             \$| = 1;
300              
301             if ($debug) {
302             print STDERR "stderr test\\n";
303             printf DEBUG "debug test %d\\n", __LINE__;
304             }
305              
306             \$0 = $q_perl{$av0} . 'connecting STREAM';
307              
308             $stream
309              
310             \$0 = $q_perl{$av0} . 'setting up MASTER';
311              
312             my \$master = new IO::Socket::INET (
313             PeerAddr => '$myfqdn:$listen_port',
314             Proto => 'tcp',
315             );
316             die "Could not connect to master at $myfqdn:$listen_port: \$!" unless \$master;
317             \$master->autoflush(1);
318             printf DEBUG "debug test %d\\n", __LINE__ if $debug;
319             print \$master "$cookie MASTER\\n"
320             or die;
321             printf DEBUG "debug test %d\\n", __LINE__ if $debug;
322              
323             \$0 = $q_perl{$av0} . 'looking for "go" from master';
324              
325             my \$go = <\$master>;
326              
327             chomp(\$go);
328             exit 1 if \$go eq 'suicide';
329              
330             \$go =~ /go (\\d+)/;
331             my \$amt = \$1;
332             die unless \$amt;
333              
334             \$0 = $q_perl{$av0} . 'downloading initial data';
335              
336             my \$buf = '';
337              
338             while (length(\$buf) < \$amt) {
339             read(\$master, \$buf, \$amt - length(\$buf), length(\$buf)) or die;
340             }
341              
342             printf DEBUG "debug test %d\\n", __LINE__ if $debug;
343              
344             \$0 = $q_perl{$av0} . 'reconstituting initial data';
345              
346             my \$data = \${thaw(\$buf)};
347              
348             \$RPC::ToWorker::Callback::master = # suppress used-once warning
349             \$RPC::ToWorker::Callback::master = \$master;
350              
351             printf DEBUG "debug test %d\\n", __LINE__ if $debug;
352             END_SLAVE2
353             print $slave eval_line_numbers(<
354              
355             \$0 = $q_perl{$av0} . 'RUNNING';
356             my \@r;
357              
358             printf DEBUG "debug test %d\\n", __LINE__ if $debug;
359             eval {
360             sub slave_eval {
361             $eval
362             }
363             \@r = slave_eval(\$data);
364             };
365             printf DEBUG "debug test %d\\n", __LINE__ if $debug;
366              
367             if (\$\@) {
368             \$0 = $q_perl{$av0} . 'returning failure';
369             print STDERR \$\@;
370             my \$err = freeze(\\\$\@);
371             printf \$master "DATA %d RETURN_ERROR\\n%s", length(\$err), \$err;
372             # exit 1; hangs
373             POSIX::_exit(1);
374             }
375              
376             \$0 = $q_perl{$av0} . 'returning results';
377              
378             my \$ret = freeze(\\\@r);
379             printf \$master "DATA %d RETURN_VALUES\\n%s", length(\$ret), \$ret;
380              
381             \$0 = $q_perl{$av0} . 'exiting';
382              
383             exit;
384              
385             BEGIN { print STDERR "compile finished.\\n" }
386             END_SLAVE3
387             shutdown($slavefh, 1); # done writing
388             }
389              
390             sub ie_connection
391             {
392             my ($pkg, $ioe) = @_;
393             print STDERR "# GOT CONNECTION\n" if $RPC::ToWorker::debug;
394             my $newfh = $ioe->accept();
395             # $newfh->event->prio(1);
396             $forced_polling{refaddr($newfh)} = $newfh;
397             weaken($forced_polling{refaddr($newfh)});
398             RPC::ToWorker::force_poll();
399             }
400              
401             sub ie_input
402             {
403             my ($self, $ioe) = @_;
404             my $cookie = <$ioe>;
405             return unless $cookie;
406             chomp($cookie);
407             print STDERR "# GOT COOKIE $cookie\n" if $debug;
408             unless ($waiting{$cookie}) {
409             warn "Unknown cookie '$cookie'";
410             next;
411             }
412             $ioe->handler($waiting{$cookie});
413             # $ioe->event->prio(4);
414             my $o = $waiting{$cookie};
415             $o->{output_handler}->(sprintf("using fd %d for $cookie (%s)\n", $ioe->fileno, $o->{desc})) if $RPC::ToWorker::debug_create;
416             $waiting{$cookie}->send_initial_data($ioe);
417             delete $waiting{$cookie};
418             RPC::ToWorker::force_poll();
419             }
420              
421             sub ie_eof
422             {
423             my ($self, $ioe) = @_;
424             $ioe->close();
425             }
426              
427              
428             package RPC::ToWorker::Master;
429              
430             #
431             # This is on the master
432             #
433              
434             use strict;
435             use warnings;
436             use Storable qw(freeze thaw);
437             use Module::Load qw(load);
438              
439             sub send_initial_data
440             {
441             my ($self, $ioe) = @_;
442              
443             if ($self->{alldone}{retrying}) {
444             print $ioe "suicide\n";
445             } else {
446             $self->{alldone}{master_go} = 1;
447              
448             my $id = freeze(\($self->{data} || undef));
449              
450             printf $ioe "go %d\n", length($id); # don't suicide
451              
452             print $ioe $id;
453             print STDERR "# DATA SENT\n" if $RPC::ToWorker::debug;
454             }
455             }
456              
457             sub ie_input
458             {
459             my ($self, $ioe, $ibr) = @_;
460             $self->{output_handler}->("control socket input ready ($self->{desc})\n") if $RPC::ToWorker::debug_create;
461             while ($$ibr =~ /\A(DATA (\d+) ([^\n]+)\n)/ && length($$ibr) - length($1) >= $2) {
462             my ($header, $dsize, $control) = ($1, $2, $3);
463             my $data = thaw(substr($$ibr, length($header), $dsize));
464             substr($$ibr, 0, length($header) + $dsize, '');
465             if ($control =~ /^RETURN_VALUES$/) {
466             $self->{output_handler}->("return values sent - $dsize ($self->{desc})\n") if $RPC::ToWorker::debug_create;
467             eval {
468             $self->{when_done}->(@$data);
469             };
470             $self->{failure}->("when done for $self->{desc}: $@") if $@;
471             $self->{return_values_sent} = 1;
472             $ioe->close();
473             } elsif ($control =~ /^RETURN_ERROR$/) {
474             my $error = $$data;
475             $self->{failure}->("SLAVE FAILURE: $error");
476             $self->{alldone}{failured} = "SLAVE FAILURE: $error";
477             $self->{output_handler}->("return error ($self->{desc})\n") if $RPC::ToWorker::debug_create;
478             } elsif ($control =~ /^CALL (\S+) with (.*?) after loading (.*)/) {
479             my ($func, $with, $mods) = ($1, $2, $3);
480             for my $mod (split(' ', $mods)) {
481             load $mod;
482             }
483             for my $item (split(' ',$with)) {
484             push(@$data, $item);
485             push(@$data, $self->{local_data}{$item});
486             }
487             my @ret;
488             eval {
489             no strict 'refs';
490             @ret = &{$func}(@$data);
491             };
492             $self->{failure}->("call to $func on behalf of $self->{desc}: $@") if $@;
493             my $ret = freeze(\@ret);
494             printf $ioe "DATA %d DONE_RESPONSE\n%s", length($ret), $ret or die;
495             } else {
496             $self->{failure}->("SLAVE FAILURE: could not parse input from slave");
497             $self->{alldone}{failured} = "SLAVE FAILURE: could not parse input from slave";
498             $self->{output_handler}->("return parse error ($self->{desc})\n") if $RPC::ToWorker::debug_create;
499             }
500             }
501             RPC::ToWorker::force_poll();
502             }
503              
504             sub ie_werror
505             {
506             my ($self, $ioe) = @_;
507             return if $self->{alldone}{retrying};
508             IO::Event->timer(
509             after => 5,
510             cb => sub { $self->{failure}->("Could not write to control socket for: $self->{desc}") },
511             );
512             print STDERR "Failed: Could not write to control socket for job: $self->{desc}, will suicide soon, after queued output has chance to print\n";
513             $self->{alldone}{failured} = "Could not write to control socket for job: $self->{desc}";
514             $self->{output_handler}->("Write error on control socket ($self->{desc})\n") if $RPC::ToWorker::debug_create;
515             }
516              
517             sub ie_eof
518             {
519             my ($self, $ioe) = @_;
520              
521             $self->{output_handler}->("EOF on control socket ($self->{desc})\n") if $RPC::ToWorker::debug_create;
522             $ioe->close();
523              
524             return if $self->{return_values_sent};
525             return if $self->{alldone}{retrying};
526              
527             IO::Event->timer(
528             after => 5,
529             cb => sub { $self->{failure}->("No return values from remote job: $self->{desc}") },
530             );
531             print STDERR "Failed: no return values from remote job: $self->{desc}, will suicide soon, after queued output has chance to print\n";
532             $self->{alldone}{failured} = "No return values from remote job: $self->{desc}";
533             }
534              
535             package RPC::ToWorker::Output;
536              
537             use strict;
538             use warnings;
539              
540             sub send_initial_data
541             {
542             my ($self, $ioe) = @_;
543             shutdown($ioe->filehandle(), 1); # we don't write to this one
544             # $ioe->event->prio(6);
545             }
546              
547             sub ie_input
548             {
549             my ($self, $ioe) = @_;
550             while (<$ioe>) {
551             next if /ssh_exchange_identification: Connection closed by remote host/;
552             if ($self->{output_handler}) {
553             $self->{output_handler}->($_)
554             } else {
555             print STDERR "$self->{prefix} OUTPUT: $_";
556             }
557             }
558             RPC::ToWorker::force_poll();
559             }
560              
561             sub ie_eof
562             {
563             my ($self, $ioe) = @_;
564             $ioe->close();
565             $self->{output_handler}->("EOF on output socket ($self->{desc})\n") if $RPC::ToWorker::debug_create;
566             }
567              
568             package RPC::ToWorker::Stream;
569              
570             use strict;
571             use warnings;
572             use Storable qw(freeze thaw);
573              
574             sub send_initial_data
575             {
576             my ($self, $ioe) = @_;
577             $self->{stream_werror} ||= sub {
578             my ($self, $ioe) = @_;
579             IO::Event::unloop_all();
580             die "Write error sending data to $self->{desc}: $!";
581             };
582             for my $h (@IO::Event::Callback::handlers, 'setup') {
583             $self->{"stream_$h"} ||= sub {};
584             }
585             $self->{'stream_setup'}->($self, $ioe);
586             # $ioe->event->prio(5);
587             }
588              
589             sub ie_input { $_[0]->{'stream_input'}->(@_) };
590             sub ie_connection { $_[0]->{'stream_connection'}->(@_) };
591             sub ie_read_ready { $_[0]->{'stream_read_ready'}->(@_) };
592             sub ie_werror { $_[0]->{'stream_werror'}->(@_) };
593             sub ie_eof { $_[0]->{'stream_eof'}->(@_) };
594             sub ie_output { $_[0]->{'stream_output'}->(@_) };
595             sub ie_outputdone { $_[0]->{'stream_outputdone'}->(@_) };
596             sub ie_connected { $_[0]->{'stream_connected'}->(@_) };
597             sub ie_connect_failed { $_[0]->{'stream_connect_failed'}->(@_)};
598             sub ie_died { $_[0]->{'stream_died'}->(@_) };
599             sub ie_timer { $_[0]->{'stream_timer'}->(@_) };
600             sub ie_exception { $_[0]->{'stream_exception'}->(@_) };
601             sub ie_outputoverflow { $_[0]->{'stream_outputoverflow'}->(@_)};
602              
603             package RPC::ToWorker::AllDone;
604              
605             use strict;
606             use warnings;
607              
608             sub DESTROY
609             {
610             my ($self) = @_;
611             $self->{failure}->($self->{failed}) if $self->{failed} && ! $self->{retrying};
612             $RPC::ToWorker::remote_killer->forget($self->{host}, $self->{slavepid})
613             if $self->{slavepid};
614             $self->{all_done}->() if $self->{all_done};
615             $RPC::ToWorker::remote_killer->forget($self->{host}, $self->{pid})
616             if $self->{pid};
617             RPC::ToWorker::force_poll();
618             $self->{output_handler}->("Alldone on ($self->{desc})\n") if $RPC::ToWorker::debug_create;
619             }
620              
621             1;
622              
623             __END__