File Coverage

blib/lib/Test/TCP/Multi.pm
Criterion Covered Total %
statement 30 127 23.6
branch 0 34 0.0
condition 0 12 0.0
subroutine 10 18 55.5
pod 0 4 0.0
total 40 195 20.5


line stmt bran cond sub pod time code
1             package Test::TCP::Multi;
2 1     1   30337 use strict;
  1         2  
  1         37  
3 1     1   5 use base qw(Exporter);
  1         1  
  1         100  
4 1     1   6 use Config;
  1         6  
  1         41  
5 1     1   1213 use IO::Handle;
  1         9322  
  1         49  
6 1     1   1110 use IO::Socket::INET;
  1         19232  
  1         8  
7 1     1   1545 use Test::SharedFork;
  1         36763  
  1         14  
8 1     1   391 use Test::More ();
  1         3  
  1         18  
9 1     1   925 use POSIX ();
  1         7891  
  1         29  
10 1     1   10 use Storable qw(nstore_fd fd_retrieve);
  1         2  
  1         107  
11 1     1   1164 use Time::HiRes();
  1         2073  
  1         1571  
12              
13             our $VERSION = '0.00004';
14             our @EXPORT = qw( empty_port test_multi_tcp wait_port kill_proc );
15              
16             # process does not die when received SIGTERM, on win32.
17             my $TERMSIG = $^O eq 'MSWin32' ? 'KILL' : 'TERM';
18              
19             sub empty_port {
20 0   0 0 0   my $port = shift || 10000;
21 0 0 0       $port = 19000 unless $port =~ /^[0-9]+$/ && $port < 19000;
22              
23 0           while ( $port++ < 20000 ) {
24 0 0         my $sock = IO::Socket::INET->new(
25             Listen => 5,
26             LocalAddr => '127.0.0.1',
27             LocalPort => $port,
28             Proto => 'tcp',
29             (($^O eq 'MSWin32') ? () : (ReuseAddr => 1)),
30             );
31 0 0         return $port if $sock;
32             }
33 0           die "empty port not found";
34             }
35              
36             sub test_multi_tcp {
37 0     0 0   my %args = @_;
38              
39 0           my (%ports, %pids, $prev);
40 0           foreach my $server (grep { /^server/i } keys %args) {
  0            
41 0 0         $prev = $ports{$server} = empty_port( defined $prev ? $prev + 1 : () );
42             }
43              
44             my $reaper = sub {
45 0   0 0     while ( scalar keys %pids > 0 && (my $kid = waitpid( -1, POSIX::WNOHANG() ) ) > 0 ) {
46 0           delete $pids{ $kid };
47 0 0         if ($^O ne 'MSWin32') { # i'm not in hell
48 0 0         if (POSIX::WIFSIGNALED($?)) {
49 0           my $signame = (split(' ', $Config{sig_name}))[POSIX::WTERMSIG($?)];
50 0 0         if ($signame =~ /^(ABRT|PIPE)$/) {
51 0           Test::More::diag("your process received SIG$signame")
52             }
53             }
54             }
55             }
56 0           };
57              
58 0           local $SIG{CHLD} = $reaper;
59              
60 0           my %processes;
61             my %sockets;
62 0           foreach my $name ( grep { /^(?:server|client)/i } keys %args ) {
  0            
63 0           my $code = $args{$name};
64 0           my ($reader, $writer);
65 0           socketpair($reader, $writer, AF_UNIX, SOCK_STREAM, PF_UNSPEC);
66              
67 0 0         if ( my $pid = Test::SharedFork->fork() ) {
    0          
68 0           close($reader);
69 0           $sockets{$name} = $writer;
70 0           $processes{$name} = $pid;
71 0           $pids{$pid}++;
72             } elsif ($pid == 0) {
73             # wait for the parent to signal us
74 0           eval {
75 0           close($writer);
76 0           my $data = fd_retrieve($reader);
77 0           close($reader);
78              
79 0 0         if ($ports{ $name }) { # it's a server
80 0           $code->( $ports{ $name }, $data );
81             } else {
82 0           $code->( $data );
83             }
84             };
85 0 0         if ($@) {
86 0           my $message = "child $name ($$): $@";
87 0           Test::More::diag($message);
88 0           die $message;
89             }
90 0           exit;
91             } else {
92 0           die "fork failed: $!";
93             }
94             }
95              
96             # merge data
97 0           my %data;
98 0           while (my ($name, $port) = each %ports ) {
99 0           $data{ $name } = { port => $port };
100             }
101 0           while (my ($name, $pid) = each %processes ) {
102 0   0       $data{$name} ||= {};
103 0           $data{$name}->{pid} = $pid;
104             }
105              
106 0           foreach my $name ( grep { /^server/i } keys %args ) {
  0            
107             # send each process information about other processes
108 0           Storable::nstore_fd \%data, $sockets{$name};
109 0           IO::Handle::flush($sockets{$name});
110             }
111              
112 0           my ($sig, $loop);
113 0           RUN: {
114 0           $loop = 1;
115 0     0     local $SIG{INT} = sub { $sig = "INT"; $loop = 0 };
  0            
  0            
116 0     0     local $SIG{PIPE} = sub { $sig = "PIPE"; $loop = 0 };
  0            
  0            
117              
118 0           while ( my($server, $port) = each %ports) {
119 0           eval {
120 0           wait_port($port);
121             };
122 0 0         if ($@) {
123 0           Test::More::diag("Failed to spawn server $server: $@");
124 0           while ( my ($name, $pid) = each %processes ) {
125 0           kill_proc( $pid );
126             }
127 0           last RUN;
128             }
129             }
130              
131 0           foreach my $name ( grep { /^client/i } keys %args ) {
  0            
132             # send each process information about other processes
133 0           Storable::nstore_fd \%data, $sockets{$name};
134 0           IO::Handle::flush($sockets{$name});
135             }
136              
137 0   0       while($loop && scalar keys %pids) {
138 0           $reaper->();
139             };
140              
141 0 0         if (scalar keys %pids) {
142 0           while (my($name, $pid) = each %processes) {
143 0           kill_proc( $pid );
144             }
145             }
146            
147             }
148              
149 0 0         if ($sig) {
150 0           kill $sig, $$; # rethrow signal after cleanup
151             }
152             }
153              
154             sub kill_proc {
155 0     0 0   foreach my $pid (@_) {
156 0 0         next unless $pid;
157 0           kill $TERMSIG => $pid;
158             }
159             }
160              
161             sub _check_port {
162 0     0     my ($port) = @_;
163              
164 0           my $remote = IO::Socket::INET->new(
165             Proto => 'tcp',
166             PeerAddr => '127.0.0.1',
167             PeerPort => $port,
168             );
169 0 0         if ($remote) {
170 0           close $remote;
171 0           return 1;
172             }
173             else {
174 0           return 0;
175             }
176             }
177              
178             sub wait_port {
179 0     0 0   my $port = shift;
180              
181 0           my $retry = 100;
182 0           while ( $retry-- ) {
183 0 0         return if _check_port($port);
184 0           Time::HiRes::sleep(0.1);
185             }
186 0           die "Waited for port $port, but was not available";
187             }
188              
189             1;
190              
191             __END__