File Coverage

blib/lib/Test/MultiFork.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1              
2             package Test::MultiFork;
3              
4 7     7   11773 use Filter::Util::Call ;
  7         8723  
  7         464  
5 7     7   5980 use Event;
  7         92470  
  7         45  
6 7     7   33688 use IO::Event;
  0            
  0            
7             use IO::Handle;
8             use Storable qw(freeze thaw);
9             require POSIX;
10             use Socket;
11             require Exporter;
12             use Time::HiRes qw(sleep);
13             use Carp;
14              
15             #print STDERR "IOE V: $IO::Event::VERSION\n";
16              
17             $VERSION = 0.6;
18              
19             @ISA = qw(Exporter);
20             @EXPORT = qw(procname lockcommon unlockcommon getcommon setcommon);
21             @EXPORT_OK = (@EXPORT, qw(groupwait setgroup dofork));
22              
23             use strict;
24             use warnings;
25              
26             # server side
27             my $stderr;
28             my $colorize;
29             my %capture;
30             my %control;
31             my $sequence = 1;
32             my $commonlock; # current holder of lock
33             my @commonwait; # waiting for lock
34             my $common = freeze([]);
35             my %groups;
36             my $timer;
37             my $bialout;
38             my $ret = '';
39             my $bailonbadplan = 0;
40              
41             our $inactivity;
42             $inactivity ||= 5;
43              
44             # client side
45             my $server;
46             my $newstdout;
47             my $letter;
48             my $number;
49             my $name;
50             my $lockdepth = 0;
51             my $group = 'default';
52             my $waiting;
53              
54             # debugging
55              
56             our $debug_common = 0;
57             our $debug_groupwait = 0;
58              
59             # constants
60              
61             my $pkg = __PACKAGE__;
62             my %color = (
63             black => 0,
64             red => 1,
65             green => 2,
66             yellow => 3,
67             blue => 4,
68             magenta => 5,
69             cyan => 6,
70             white => 7,
71             default => 9,
72             );
73             my %color_bg = (
74             a => $color{black},
75             b => $color{blue},
76             c => $color{green},
77             d => $color{red},
78             e => $color{cyan},
79             f => $color{magenta},
80             g => $color{yellow},
81             );
82             my @color_fg = (
83             'notused',
84             $color{white},
85             $color{yellow},
86             $color{cyan},
87             $color{magenta},
88             $color{red},
89             $color{green},
90             $color{blue},
91             $color{black},
92             );
93              
94             # shared
95             my $signal;
96              
97             sub import
98             {
99             my $pkg = shift;
100             my @ia;
101             for my $ia (@_) {
102             if ($ia eq 'stderr') {
103             $stderr = 1;
104             } elsif ($ia eq 'colorize') {
105             $colorize = ($ENV{TERM} =~ /xterm/) && ! $ENV{HARNESS_ACTIVE};
106             } elsif ($ia eq 'bail_on_bad_plan') {
107             $bailonbadplan = 1;
108             } else {
109             push(@ia, @_);
110             }
111             }
112             filter_add(bless [], $pkg);
113             $pkg->export_to_level(1, @ia);
114             }
115              
116             sub dofork
117             {
118             my ($spec) = @_;
119              
120             while($spec) {
121             $spec =~ s/^([a-z])(\d*)// || confess "illegal fork spec";
122             my $l = $1;
123             my $count = $2 || 1;
124             for my $n (1..$count) {
125             my $pid;
126             my $psideCapture = new IO::Handle;
127             my $psideControl = new IO::Handle;
128             $server = new IO::Handle;
129             $newstdout = new IO::Handle;
130             socketpair($psideCapture, $newstdout, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
131             || confess "socketpair: $!";
132             socketpair($psideControl, $server, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
133             || confess "socketpair: $!";
134             if ($pid = fork()) {
135             # parent
136             #sleep(0.1);
137             $server->close();
138             $newstdout->close();
139              
140             if (0 && 'CRAZY_STUFF') {
141             use IO::Pipe;
142             my $pipe = new IO::Pipe;
143              
144             if (fork()) {
145             $newstdout->close();
146             $pipe->reader();
147             $psideCapture = $pipe;
148             } else {
149             $pipe->writer();
150             my $fn = $pipe->fileno();
151             open(STDOUT, ">&=$fn") || confess "redirect stdout2: $!";
152             $fn = $psideCapture->fileno();
153             open(STDIN, "<&=", $fn) || confess "redirect stdin: $!";
154             exec("tee bar.$$") || confess "exec: $!";
155             }
156             }
157              
158             Test::MultiFork::Control->new($psideControl, $l, $n, $pid);
159             Test::MultiFork::Capture->new($psideCapture, $l, $n);
160              
161             } elsif (defined $pid) {
162             # child
163             $letter = $l;
164             $number = $n;
165             $name = "$l-$n";
166              
167             $psideCapture->close();
168             $psideControl->close();
169             for my $c (keys %capture) {
170             $capture{$c}{ie}->close();
171             delete $capture{$c};
172             }
173             for my $c (keys %control) {
174             $control{$c}{ie}->close();
175             delete $control{$c};
176             }
177              
178             if (0 && 'CRAZY_STUFF') {
179             use IO::Pipe;
180             my $pipe = new IO::Pipe;
181              
182             if (fork()) {
183             $newstdout->close();
184             $pipe->writer();
185             $newstdout = $pipe;
186             } else {
187             my $fn = $newstdout->fileno();
188             open(STDOUT, ">&=$fn") || confess "redirect stdout2: $!";
189             $pipe->reader();
190             $fn = $pipe->fileno();
191             #close(STDIN);
192             open(STDIN, "<&=", $fn) || confess "redirect stdin: $!";
193             exec("tee foo.$$") || confess "exec: $!";
194             }
195             }
196              
197             $newstdout->autoflush(1);
198             $server->autoflush(1);
199             if (defined &Test::Builder::new) {
200             my $tb = new Test::Builder;
201             $tb->output($newstdout);
202             $tb->todo_output($newstdout);
203             $tb->failure_output($newstdout);
204             }
205             my $fn = $newstdout->fileno();
206             open(STDOUT, ">&=$fn") || confess "redirect stdout: $!";
207             autoflush STDOUT 1;
208             if ($stderr) {
209             open(STDERR, ">&=$fn") || confess "redirect stdout: $!";
210             autoflush STDERR 1;
211             }
212            
213             $SIG{$signal} = \&lastrites
214             if $signal;
215              
216             $waiting = "for initial begin";
217             my $x = <$server>;
218             confess unless $x eq "begin\n";
219             undef $waiting;
220              
221             return;
222             } else {
223             confess "Can't fork: $!";
224             }
225             }
226             }
227             #print "about to create timer\n";
228             $timer = Test::MultiFork::Timer->new();
229              
230             #print "sending begin\n";
231             for my $control (values %control) {
232             $control->{fh}->print("begin\n");
233             }
234              
235             # exit on die
236             $Event::DIED = sub {
237             Event::verbose_exception_handler(@_);
238             Event::unloop_all();
239             };
240              
241             #print "event loop\n";
242             if (Event::loop() == 7.3) {
243             # great
244             notokay(0, '', '', '', "clean shutdown");
245             } else {
246             notokay(1, '', '', '', "event loop timeout");
247             }
248             $sequence--;
249             print "\n1..$sequence\n";
250             exit(0);
251             }
252              
253             sub groupwait
254             {
255             my ($tag) = @_;
256             my (undef, $filename, $line) = caller;
257             $tag = "$filename:$line" unless $tag;
258             print $server "waitforgroup $tag\n";
259             $waiting = "for go-ahead after a group wait ($group)";
260             my $go = <$server>;
261             confess "go='$go' (not 'go\\n')" unless $go eq "go\n";
262             undef $waiting;
263             }
264              
265             sub procname
266             {
267             my $oname = $name;
268             if (@_) {
269             $name = $_[0];
270             confess if $name =~ /\n/;
271             print $server "setname $name\n";
272             }
273             return ($name, $letter, $number) if wantarray;
274             return $name;
275             }
276              
277             sub setgroup
278             {
279             my $og = $group;
280             if (@_) {
281             $group = $_[0];
282             confess if $group =~ /\n/;
283             print $server "setgroup $group\n";
284             }
285             return $og;
286             }
287              
288             sub lockcommon
289             {
290             print STDERR "\n[$letter-$number] locking common -request\n" if $debug_common;
291             unless ($lockdepth++) {
292             print $server "lock common\n";
293             $waiting = "lock on common data";
294             my $youhavelock = <$server>;
295             confess unless $youhavelock eq "youhavelock\n";
296             undef $waiting;
297             }
298             print STDERR "\n[$letter-$number] locking common done\n" if $debug_common;
299             }
300              
301             sub unlockcommon
302             {
303             print STDERR "\n[$letter-$number] unlocking common -request\n" if $debug_common;
304             unless (--$lockdepth) {
305             print $server "unlock common\n";
306             undef $common;
307             }
308             if ($lockdepth < 0) {
309             warn "common already unlocked";
310             $lockdepth = 0;
311             }
312             print STDERR "\n[$letter-$number] unlocking common done\n" if $debug_common;
313             }
314              
315             sub getcommon
316             {
317             print STDERR "\n[$letter-$number] get common - request\n" if $debug_common;
318             print $server "get common\n";
319             $waiting = "to get size of common data";
320             my $size = <$server>;
321             $waiting = "for common data";
322             my $buf;
323             my $amt = read($server, $buf, $size);
324             confess unless $amt == $size;
325             undef $waiting;
326             my $r = thaw($buf);
327             print STDERR "\n[$letter-$number] get common done\n" if $debug_common;
328             return @$r if wantarray;
329             return $r->[0];
330             }
331              
332             sub setcommon
333             {
334             print STDERR "\n[$letter-$number] set common -request\n" if $debug_common;
335             my $x = freeze([@_]);
336             print $server "set common\n";
337             print $server length($x)."\n";
338             print $server $x;
339             print STDERR "\n[$letter-$number] set common done\n" if $debug_common;
340             }
341              
342             sub notokay
343             {
344             my ($not, $name, $letter, $n, $comment) = @_;
345             $not = $not ? "not " : "";
346             $name = " - $name" unless $name =~ /^\s*-/;
347             $comment = "" unless defined $comment;
348             cprint($letter, $n, "${not}ok $sequence $name # $comment\n");
349             $sequence++;
350             }
351              
352             sub lastrites
353             {
354             if ($waiting) {
355             print STDERR "\nSERVER WAIT $name $number-$letter: $waiting";
356             }
357             confess;
358             }
359              
360             sub cprint
361             {
362             my $letter = shift;
363             my $n = shift;
364             if ($colorize && $letter) {
365             my $fg = $color_fg[$n] || 7;
366             my $bg = $color_bg{$letter} || 0;
367             $fg = 7 if $bg == $fg;
368             print "\x9b3${fg}m\x9b4${bg}m";
369             print @_;
370             print "\x9b39m\x9b49m";
371             } else {
372             print @_;
373             }
374             }
375              
376             sub filter {
377             my ($self) = @_;
378              
379             my @new;
380             while (filter_read() > 0) {
381             push(@new, $_);
382             $_ = '';
383             }
384             if (@new) {
385             my %procs;
386             my $insub = '';
387             my $active = 1;
388             my $fork;
389              
390             for (@new) {
391             if (s/^(FORK_([a-z0-9]+):\s*)$/## $1/) {
392             confess "only one FORK_ allowed" if $fork;
393             $fork = $2;
394             } elsif (s/^(SIGNAL_(\w*):\s*)$/## $1/) {
395             confess "only one SIGNAL_ allowed" if defined $signal;
396             $signal = $2;
397             }
398             }
399             $signal = 'USR1' unless defined $signal;
400             $signal = '' if $signal eq 'none';
401            
402             if (defined $fork) {
403             dofork($fork);
404              
405             while (@new) {
406             $_ = shift @new;
407              
408             if (/^sub\s+\w+(?!.*?;)/) {
409             $insub = 1; # {
410             } elsif (/^}/) {
411             $insub = 0;
412             }
413              
414             if (/^([a-z]+):/) {
415             my $sets = $1;
416             $active = (($sets =~ /$letter/o) ? 1 : 0);
417             unless ($insub) {
418             push(@$self, "${pkg}::groupwait();\n")
419             } else {
420             push(@$self, "#$insub# $_");
421             }
422             } elsif ($active) {
423             push(@$self, $_);
424             } else {
425             push(@$self, "#$insub# $_");
426             }
427             }
428             } else {
429             # no builtin fork
430             @$self = @new;
431             }
432             #print "SOURCE: @$self\n DONE\n";
433             }
434             return 0 unless @$self;
435             $_ = shift @$self;
436             return 1;
437             }
438              
439             package Test::MultiFork::Timer;
440              
441             use Carp;
442             use strict;
443             use warnings;
444              
445             sub new
446             {
447             my ($pkg) = @_;
448             my $self = bless { }, $pkg;
449              
450             $self->{event} = Event->timer(
451             cb => [ $self, 'timeout' ],
452             interval => $inactivity,
453             hard => 0,
454             );
455             return $self;
456             }
457              
458             sub timeout
459             {
460             print STDERR "\nBail out! Timeout in Test::MultiFork\n";
461              
462             for my $c (values %control) {
463             my $x = ($c->{name} eq $c->{code}) ? $c->{name} : "$c->{name} ($c->{code})";
464             my $y = $c->{status}
465             ? $c->{status}
466             : ($c->{waiting}
467             ? "waiting for $c->{group} for $c->{waiting}"
468             : ($c->{lockstatus}
469             ? $c->{lockstatus}
470             : "idle"));
471             my @z;
472             my $e = $c->{ie}->event;
473             push(@z, "cancelled") if $e->is_cancelled;
474             push(@z, "active") if $e->is_active;
475             push(@z, "running") if $e->is_running;
476             push(@z, "suspended") if $e->is_suspended;
477             push(@z, "pending") if $e->pending;
478             print STDERR "$x: $y (event status: @z)\n";
479             if ($signal) {
480             kill($signal, $c->{pid});
481             }
482             sleep(0.2);
483             }
484             Event::unloop_all(7.2) unless %control || %capture;
485             exit(1);
486             }
487              
488             sub reset
489             {
490             my ($self) = @_;
491             #my (undef, $f, $l) = caller;
492             #print "timer reset from $f:$l\n";
493             $self->{event}->stop();
494             $self->{event}->again();
495             }
496              
497             package Test::MultiFork::Capture;
498              
499             use Carp;
500             use strict;
501             use warnings;
502              
503             sub new
504             {
505             my ($pkg, $fh, $letter, $n) = @_;
506             my $self = bless {
507             letter => $letter,
508             n => $n,
509             seq => 1,
510             plan => undef,
511             code => "$letter-$n",
512             name => "$letter-$n",
513             }, $pkg;
514             $self->{ie} = IO::Event->new($fh, $self);
515             $capture{$self->{code}} = $self;
516             return $self;
517             }
518              
519             sub ie_input
520             {
521             my ($self, $ie) = @_;
522             $timer->reset;
523             my $bailout;
524             while (<$ie>) {
525             # print "\nRECV$self->{n}: '$_'";
526             chomp;
527             if (/^(?:(not)\s+)?ok\S*(?:\s+(\d+))?([^#]*)(?:#(.*))?$/) {
528             my ($not, $seq, $name, $comment) = ($1, $2, $3, $4);
529             $name = '' unless defined $name;
530             $comment = '' unless defined $name;
531             if (defined($seq)) {
532             if ($seq != $self->{seq}) {
533             Test::MultiFork::notokay(1, $self->{name}, $self->{letter}, $self->{n},
534             "result ordering in $self->{name}",
535             "expected '$self->{seq}' but got '$seq'");
536             }
537             $self->{seq} = $seq+1;
538             } else {
539             $self->{seq}++;
540             }
541             $comment .= " [ $self->{name} #$seq ]";
542             Test::MultiFork::notokay($not, $name, $self->{letter}, $self->{n}, $comment);
543             next;
544             }
545             if (/^1\.\.(\d+)/) {
546             Test::MultiFork::notokay(1, $self->{name}, $self->{letter}, $self->{n}, "multiple plans")
547             if defined $self->{plan};
548             $self->{plan} = $1;
549             next;
550             }
551             Test::MultiFork::cprint($self->{letter}, $self->{n}, "$_ [$self->{name}]\n");
552             }
553             exit 1 if $bailout;
554             }
555              
556             sub ie_eof
557             {
558             my ($self, $ie) = @_;
559             if ($self->{plan}) {
560             $self->{seq}--;
561             if ($self->{plan} == $self->{seq}) {
562             Test::MultiFork::notokay(0, $self->{name}, $self->{letter}, $self->{n}, "plan followed");
563             } else {
564             Test::MultiFork::notokay(1, $self->{name}, $self->{letter}, $self->{n},
565             "plan followed $self->{seq}",
566             "plan: $self->{plan} actual: $self->{seq}");
567             }
568             }
569             $ie->close();
570             delete $capture{$self->{code}};
571             Event::unloop_all(7.3) unless %control || %capture;
572             }
573              
574              
575             package Test::MultiFork::Control;
576              
577             use Carp;
578             use strict;
579             use warnings;
580              
581             sub new
582             {
583             my ($pkg, $fh, $letter, $n, $pid) = @_;
584             my $self = bless {
585             fh => $fh,
586             letter => $letter,
587             n => $n,
588             seq => 1,
589             plan => undef,
590             code => "$letter-$n",
591             name => "$letter-$n",
592             group => 'default',
593             pid => $pid,
594             # waiting
595             }, $pkg;
596             $self->{ie} = IO::Event->new($fh, $self);
597             $control{$self->{code}} = $self;
598             $groups{default}{"$letter-$n"} = $self;
599             return $self;
600             }
601              
602             sub ie_input
603             {
604             my ($self, $ie) = @_;
605             #print "\nBEGIN ie_input for $self->{code}";
606             #print "\ncontrol input...";
607             $timer->reset;
608             while (<$ie>) {
609             #my $x = $_;
610             #$x =~ s/\n/\\n/g;
611             #print "\nCONTROL: $self->{code}:$x.";
612              
613             ### name
614              
615             if (/^setname (.*)/) {
616             $self->{name} = $1;
617             $capture{$self->{code}}{name} = $1
618             if exists $capture{$self->{code}};
619              
620             ### common (shared data)
621              
622             } elsif (/^lock common/) {
623             if ($commonlock) {
624             push(@commonwait, $self);
625             $self->{status} = "waiting for common data lock";
626             } else {
627             $commonlock = $self;
628             $self->{lockstatus} = "holding common data lock";
629             #print "\nSEND TO $self->{code}: youhavelock\n";
630             print $ie "youhavelock\n";
631             }
632             } elsif (/^unlock common/) {
633             confess unless $commonlock eq $self;
634             delete $self->{lockstatus};
635             $commonlock = shift @commonwait;
636             if ($commonlock) {
637             #print "\nWAKEUP SEND TO $commonlock->{code}: youhavelock\n";
638             $commonlock->{fh}->print("youhavelock\n");
639             $commonlock->{lockstatus} = "holding common data lock";
640             delete $commonlock->{status};
641             }
642             } elsif (/^set common/) {
643             confess unless $commonlock eq $self;
644             my $size = $ie->get();
645             if (defined $size) {
646             if ($ie->can_read($size)) {
647             read($ie, $common, $size) == $size
648             || confess;
649             } else {
650             $ie->unget($size);
651             $ie->unget("set common");
652             $self->{status} = "waiting for common data";
653             last;
654             }
655             } else {
656             $ie->unget("set common");
657             $self->{status} = "waiting for size of common data";
658             last;
659             }
660             } elsif (/^get common/) {
661             #print "\nSEND TO $self->{code}: length & common";
662             print $ie length($common)."\n";
663             print $ie $common;
664              
665             ### group afiliation
666              
667             } elsif (/^setgroup (.*)/) {
668             $self->{newgroup} = $1;
669            
670             ### synchronization
671              
672             } elsif (/^waitforgroup (.*)/) {
673             $self->{waiting} = $1;
674             wake_group($self->{group});
675             ### oops
676              
677             } else {
678             confess "unknown control: $_";
679             }
680             }
681             #print "\nstatus = $self->{status}";
682             #print "\nEND ie_input for $self->{code}\n";
683             #print "return\n";
684             }
685              
686             sub ie_eof
687             {
688             my ($self, $ie) = @_;
689             $ie->close();
690             delete $control{$self->{code}};
691             Event::unloop_all(7.3) unless %control || %capture;
692             }
693              
694             sub wake_group
695             {
696             my ($group) = @_;
697             my $allthere = 1;
698             my @members = values %{$groups{$group}};
699             my $tag;
700             for my $member (@members) {
701             if ($member->{waiting}) {
702             print "$member->{code} waiting at $member->{waiting}\n" if $debug_groupwait;
703             if (defined $tag) {
704             if ($tag ne $member->{waiting}) {
705             Test::MultiFork::notokay(1,
706             $members[0]->{name},
707             $members[0]->{letter},
708             $members[0]->{n},
709             sprintf("inconsistent group wait locations: %s:'%s' vs %s:'%s'",
710             $members[0]->{name},
711             $members[0]->{waiting},
712             $member->{name},
713             $member->{waiting}));
714             }
715             } else {
716             $tag = $member->{waiting}
717             }
718             } else {
719             print "$member->{code} not waiting\n" if $debug_groupwait;
720             $allthere = 0;
721             last;
722             }
723             }
724             if ($allthere) {
725             print "ALL THERE\n" if $debug_groupwait;
726             for my $member (@members) {
727             next unless $member->{newgroup};
728             delete $groups{$member->{group}}{$member->{code}};
729             $member->{group} = $1;
730             $groups{$member->{newgroup}}{$member->{code}} = $member;
731             delete $member->{newgroup};
732             }
733             for my $member (@members) {
734             delete $member->{waiting};
735             print "WAKEUP $member->{code}\n" if $debug_groupwait;
736             $member->{fh}->print("go\n");
737             }
738             }
739             }
740              
741             1;