File Coverage

blib/lib/Net/DNAT.pm
Criterion Covered Total %
statement 21 232 9.0
branch 0 90 0.0
condition 0 35 0.0
subroutine 7 15 46.6
pod 5 6 83.3
total 33 378 8.7


line stmt bran cond sub pod time code
1             package Net::DNAT;
2              
3 1     1   415 use strict;
  1         16  
  1         33  
4 1     1   5 use Exporter;
  1         2  
  1         32  
5 1     1   4 use vars qw(@ISA $VERSION $listen_port);
  1         4  
  1         57  
6 1     1   471 use Net::Server::Multiplex 0.85;
  1         62458  
  1         31  
7 1     1   548 use Net::Ping 2.29;
  1         8992  
  1         57  
8 1     1   10 use IO::Socket;
  1         2  
  1         9  
9 1     1   425 use Carp ();
  1         2  
  1         3521  
10              
11             $VERSION = '0.15';
12             @ISA = qw(Net::Server::Multiplex);
13              
14             $listen_port = getservbyname("http", "tcp");
15              
16             # DEBUG warnings
17             $SIG{__WARN__} = sub {
18             &Carp::cluck((scalar localtime).": [pid $$] WARNING\n : $_[0]");
19             };
20              
21             # DEBUG dies
22             my $dying = 0;
23             $SIG{__DIE__} = sub {
24             $dying++;
25             if ($dying > 2) {
26             # Safety to avoid recursive or infinite dies
27             return exit(1);
28             }
29             print STDERR ((scalar localtime),": [pid $$] CRASHED\n : ",@_,"\n");
30             if ($^S) {
31             # Die within eval does not count.
32             $dying--;
33             # Just use regular die.
34             return CORE::die(@_);
35             }
36             # Stack trace of who crashed.
37             &Carp::confess(@_);
38             };
39              
40              
41             sub _resolve_it {
42 0     0     my $string = shift;
43 0           my @result = ();
44 0           my $port = $listen_port;
45 0 0         if ($string =~ s/:(\d+)//) {
    0          
46 0           $port = $1;
47             } elsif ($string =~ s/:(\w+)//) {
48 0           $port = getservbyname($1, "tcp");
49             }
50 0 0         if ($string !~ /^\d+\.\d+\.\d+\.\d+$/) {
51 0           my $j;
52 0           ($j, $j, $j, $j, @result) = gethostbyname($string);
53 0 0         die "Failed to resolve [$string] to an IP address\n"
54             unless @result;
55 0           map { $_ = join(".", unpack("C4", $_)); } @result;
  0            
56             } else {
57 0           @result = ($string);
58             }
59 0           map { $_ .= ":$port"; } @result;
  0            
60 0           return @result;
61             }
62              
63             sub post_configure_hook {
64 0     0 1   my $self = shift;
65             my $conf_hash = {
66 0           @{ $self->{server}->{configure_args} }
  0            
67             };
68             my $old_pools_ref = $conf_hash->{pools} ||
69 0   0       die "The 'pools' setting is missing!\n";
70 0 0 0       unless (ref $old_pools_ref &&
71             ref $old_pools_ref eq "HASH") {
72 0           $old_pools_ref = { default => $old_pools_ref };
73             }
74              
75 0           my $new_pools_ref = {};
76 0           foreach my $poolname (keys %{ $old_pools_ref }) {
  0            
77             # The first element is the cycle index
78 0           my @list = (0);
79 0           my $dest = $old_pools_ref->{$poolname};
80 0 0         if (!ref $dest) {
    0          
81 0           push(@list, _resolve_it($dest));
82             } elsif (ref $dest eq "ARRAY") {
83 0           foreach my $i (@{ $dest }) {
  0            
84 0           push(@list, _resolve_it($i));
85             }
86             } else {
87 0           die "Unimplemented type of pool destination [".(ref $dest)."]\n";
88             }
89 0           $new_pools_ref->{$poolname} = [ @list ];
90             }
91 0           $self->{orig_pools} = $self->{pools} = $new_pools_ref;
92              
93 0   0       my $old_switch_table_ref = $conf_hash->{host_switch_table} || {};
94 0           my $new_switch_table_ref = {};
95 0           foreach my $old_host (keys %{ $old_switch_table_ref }) {
  0            
96 0           my $new_host = $old_host;
97 0 0         if ($new_host =~ s/^([a-z0-9\-\.]*[a-z])\.?$/\L$1/i) {
98 0           $new_switch_table_ref->{$new_host} = $old_switch_table_ref->{$old_host};
99             } else {
100 0           die "Invalid hostname [$old_host] in host_switch_table\n";
101             }
102             }
103 0           $self->{host_switch_table} = $new_switch_table_ref;
104              
105 0   0       $self->{switch_filters} = $conf_hash->{switch_filters} || [];
106             # Run a quick sanity check on each pool destination
107 0           for (my $i = scalar $#{ $self->{switch_filters} };
  0            
108             $i > 0; $i-=2) {
109 0 0         if (!$self->{pools}->{$self->{switch_filters}->[$i]}) {
110 0           die "No such 'switch_filters' pool [".($self->{switch_filters}->[$i])."]\n";
111             }
112             }
113              
114 0   0       $self->{default_pool} = $conf_hash->{default_pool} || undef;
115 0 0         if (!defined $self->{default_pool}) {
116 0 0         if (( scalar keys %{ $self->{pools} } ) == 1) {
  0            
117             # Only one pool? Guess that should be the default.
118 0           ($self->{default_pool}) = keys %{ $self->{pools} };
  0            
119             } else {
120 0           die "The 'default_pool' setting must be specified with multiple pools!\n";
121             }
122             }
123 0 0         if (!$self->{pools}->{$self->{default_pool}}) {
124 0           die "The 'default_pool' [$self->{default_pool}] has not been defined!\n";
125             }
126              
127             # Plenty of time to establish the tcp three-way handshake
128             # for a connection to a destination node in a pool.
129             $self->{connect_timeout} =
130             defined $conf_hash->{connect_timeout} ?
131 0 0         $conf_hash->{connect_timeout} : 3;
132              
133 0 0         if (exists $conf_hash->{check_for_dequeue}) {
134 0 0 0       if (defined $conf_hash->{check_for_dequeue} &&
135             $conf_hash->{check_for_dequeue} > 0) {
136             $self->{server}->{check_for_dequeue} =
137 0           $conf_hash->{check_for_dequeue};
138             }
139             } else {
140 0           $self->{server}->{check_for_dequeue} = 60;
141             }
142              
143 0 0         $self->check_pools if $self->{server}->{check_for_dequeue};
144             }
145              
146             sub run_dequeue {
147 0     0 1   my $self = shift;
148 0           $self->check_pools;
149             }
150              
151             sub check_pools {
152 0     0 0   my $self = shift;
153 0           my $new_pools = {};
154 0           my $ping_cache = {};
155 0           my $pinger = new Net::Ping "tcp", $self->{connect_timeout};
156 0           $pinger->tcp_service_check(1);
157 0           foreach my $pool (keys %{ $self->{orig_pools} }) {
  0            
158 0 0         my $index = $self->{pools}->{$pool} ? $self->{pools}->{$pool}->[0] : 0;
159 0           for(my $i = 1; $i < @{ $self->{orig_pools}->{$pool} }; $i++) {
  0            
160 0           $self->log(4, "Checking pool [$pool] index [$i]...");
161 0           my ($host, $port) = $self->{orig_pools}->{$pool}->[$i] =~ /^(.+):(\d+)$/;
162 0 0 0       next unless($host && $port);
163              
164 0           my $alive;
165 0 0         if(exists $ping_cache->{"$host:$port"}) {
166 0           $alive = $ping_cache->{"$host:$port"};
167 0           $self->log(4, "Cached pool [$pool] index [$i] at [$host:$port] is [$alive]");
168             } else {
169 0           $self->log(4, "Testing pool [$pool] index [$i] at [$host:$port]...");
170 0           $pinger->{port_num} = $port;
171 0           $alive = $ping_cache->{"$host:$port"} = $pinger->ping($host);
172 0 0         if (!$alive) {
173 0           $self->log(1, "WARNING: [$host:$port] is down!");
174             }
175             }
176 0 0         next unless($alive);
177 0 0         if (!$new_pools->{$pool}) {
178 0           $new_pools->{$pool} = [$index];
179             }
180 0           push @{$new_pools->{$pool}}, $self->{orig_pools}->{$pool}->[$i];
  0            
181             }
182             }
183 0           $pinger->close;
184 0           $self->{pools} = $new_pools;
185             }
186              
187             sub mux_connection {
188 0     0 1   my $self = shift;
189 0           shift; # I do not need mux
190 0           my $fh = shift;
191 0           $self->{net_server}->log(4, "Connection on fileno [".fileno($fh)."]");
192 0           $self->{state} = "REQUEST";
193             # Store tied file handle within object
194 0           $self->{fh} = $fh;
195             # Grab peer information before it's gone
196 0           $self->{peeraddr} = $self->{net_server}->{server}->{peeraddr};
197 0           $self->{peerport} = $self->{net_server}->{server}->{peerport};
198             }
199              
200              
201             sub mux_input {
202 0     0 1   my $self = shift;
203 0           my $mux = shift;
204 0           my $fh = shift;
205 0           my $data = shift;
206              
207 0           my $pool = undef; # Which pool to redirect to
208              
209 0 0 0       unless (defined $fh and defined fileno($fh)) {
210 0           $self->{net_server}->log(4, "mux_input: WEIRD fh! Trashing (".length($$data)." bytes) input. (This should never happen.)");
211 0           $$data = "";
212 0           return;
213             }
214              
215 0 0         if ($self->{state} eq "REQUEST") {
216 0           $self->{net_server}->log(4, "input on [REQUEST] ($$data)");
217             # Ignore leading whitespace and blank lines
218 0           while ($$data =~ s/^\s+//) {}
219 0 0         if ($$data =~ s%^([^\r\n]*)\r?\n%%) {
220             # First newline reached.
221 0           my $request = $1;
222 0 0         if ($request =~ m%
223             (\w+)\s+ # method
224             (/.*)\s+ # path
225             HTTP/(1\.[01]) # protocol
226             $%ix) {
227 0           $self->{request_method} = $1; # GET or POST
228 0           $self->{request_path} = $2; # URL path
229 0           $self->{request_proto} = $3; # 1.0 or 1.1
230 0           $self->{state} = "HEADERS";
231             } else {
232 0           $self->{state} = "CONTENT";
233 0           $_ = $request;
234 0           goto POOL_DETERMINED;
235             }
236             }
237             }
238              
239 0 0 0       if ($self->{state} eq "HEADERS" && $$data) {
240 0           $self->{net_server}->log(4, "input on [HEADERS] ($$data)");
241             # Search for the "nothing" line
242 0 0         if ($$data =~ s/^((.*\n)*)\r?\n//) {
243             # Found! Jump to next state.
244 0           $self->{request_headers_block} = $1;
245             # Wipe some headers for cleaner protocol
246             # conversion and for security reasons.
247             $self->{request_headers_block} =~
248 0           s%^(Connection|
249             Keep-Alive|
250             Remote-Addr|
251             Remote-Port|
252             ):.*\n
253             %%gmix;
254              
255             # Add headers for Apache::DNAT
256             $self->{request_headers_block} .=
257 0           "Remote-Addr: $self->{peeraddr}\r\n".
258             "Remote-Port: $self->{peerport}\r\n";
259              
260 0           $self->{state} = "CONTENT";
261             # Determine correct pool destination
262             # based on the request $_
263 0           $_ = "$self->{request_method} $self->{request_path} HTTP/1.0\r\n$self->{request_headers_block}";
264             # Rectify host header for simplicity
265 0           s/^Host:\s*([\w\-\.]*\w)\.?((:\d+)?)\r?\n/Host: \L$1$2\r\n/im;
266              
267             # First run through the switch_filters
268 0           my @switch_filters = @{ $self->{net_server}->{switch_filters} };
  0            
269 0           while (@switch_filters) {
270 0           my ($ref, $then_pool) = splice(@switch_filters, 0, 2);
271 0 0         if (my $how = ref $ref) {
272 0 0         if ($how eq "CODE") {
    0          
273 0 0         if (&$ref()) {
274 0           $pool = $then_pool;
275 0           last;
276             }
277             } elsif ($how eq "Regexp") {
278 0 0         if ($_ =~ $ref) {
279 0           $pool = $then_pool;
280 0           last;
281             }
282             } else {
283 0           die "Switch filter to [$then_pool] smells too weird!\n";
284             }
285             } else {
286 0           die "Switch filter [$ref] is not a ref!\n";
287             }
288             }
289              
290             # Then run through the host_switch_table
291 0 0 0       if (!defined($pool) && m%^Host: ([\w\-\.]+)%m) {
292 0           my $request_host = $1;
293              
294 0           foreach my $host (keys %{ $self->{net_server}->{host_switch_table} }) {
  0            
295 0 0         if ( $request_host eq $host ) {
296 0           $pool = $self->{net_server}->{host_switch_table}->{$host};
297 0           last;
298             }
299             }
300             }
301              
302             POOL_DETERMINED:
303             # Otherwise, just use the default
304 0 0         if (!defined($pool)) {
305 0           $pool = $self->{net_server}->{default_pool};
306             }
307              
308 0           $self->{net_server}->log(4, "POOL DETERMINED: [$pool]");
309 0           my $pool_ref = $self->{net_server}->{pools}->{$pool};
310 0 0         if (!$pool_ref) {
311 0           $self->{net_server}->log(4, "Pool [$pool] is down.");
312 0           $mux->write($fh, "ERROR: Pool [$pool] is down.\n");
313 0           $$data = "";
314 0           $mux->shutdown($fh, 2);
315 0           return;
316             }
317              
318             # Increment cycle counter.
319             # If it exceeds pool size
320 0 0         if (++($pool_ref->[0]) > $#{ $pool_ref }) {
  0            
321             # Start over with 1 again.
322 0           $pool_ref->[0] = 1;
323             }
324 0           $self->{net_server}->log(4, "POOL CYCLE INDEX [$pool_ref->[0]]");
325 0           my $peeraddr = $pool_ref->[$pool_ref->[0]];
326 0           $self->{net_server}->log(4, "Connecting to destination [$peeraddr]");
327              
328 0           $@ = "";
329 0           my $peersock = eval {
330 0           local $SIG{__DIE__} = 'DEFAULT';
331 0     0     local $SIG{ALRM} = sub { die "Timed out!\n"; };
  0            
332 0           alarm ($self->{net_server}->{connect_timeout});
333 0 0         new IO::Socket::INET $peeraddr or die "$!\n";
334             };
335 0           alarm(0); # Reset alarm
336 0 0         $peersock = undef if $@;
337 0 0         if ($peersock) {
338 0           $self->{net_server}->log(4, "Connected successfully with fileno [".fileno($peersock)."]");
339 0           $mux->add($peersock);
340             my $proxy_object = bless {
341             state => "CONTENT",
342             fh => $peersock,
343             proto => $self->{request_proto},
344             complement_object => $self,
345             net_server => $self->{net_server},
346 0           }, (ref $self);
347 0           $self->{net_server}->log(4, "Complement for socket on fileno [".fileno($fh)."] created on fileno [".fileno($peersock)."]");
348 0           $self->{complement_object} = $proxy_object;
349 0           $mux->set_callback_object($proxy_object, $peersock);
350 0           $mux->write($peersock, "$_\r\n");
351             #$_ = "$self->{request_method} $self->{request_path} HTTP/1.0\r\n$self->{request_headers_block}";
352             } else {
353 0           $self->{net_server}->log(4, "Could not connect to [$peeraddr]: $@");
354 0           $mux->write($fh, "ERROR: Pool [$pool] Index [$pool_ref->[0]] (Peer $peeraddr) is down: $!\n");
355 0           $$data = "";
356 0           $mux->shutdown($fh, 2);
357 0 0         $self->{net_server}->check_pools if $self->{net_server}->{server}->{check_for_dequeue};
358             }
359             }
360             }
361              
362 0 0 0       if ($self->{state} eq "CONTENT" && $$data) {
363             # Test to make sure complement is up
364 0 0 0       if ($self->{complement_object} and $self->{complement_object}->{fh} and
      0        
365             defined fileno($self->{complement_object}->{fh})) {
366 0           $self->{net_server}->log(4, "input on [CONTENT] on fileno [".fileno($fh)."] (".length($$data)." bytes) to socket on fileno [".fileno($self->{complement_object}->{fh})."]");
367 0           $mux->write($self->{complement_object}->{fh}, $$data);
368             } else {
369 0           $self->{net_server}->log(4, "mux_input: Complement CONTENT socket is gone! Trashing (".length($$data)." bytes) input.");
370             # close() is a bit stronger than shutdown()
371 0           $mux->kill_output($fh);
372 0           $mux->close($fh);
373             }
374             # Consumed everything
375 0           $$data = "";
376             }
377              
378             }
379              
380             sub mux_eof {
381 0     0 1   my $self = shift;
382 0           my $mux = shift;
383 0           my $fh = shift;
384 0           my $data = shift;
385 0           $self->{net_server}->log(4, "EOF received on fileno [".fileno($fh)."] ($$data)");
386              
387             # If it hasn't been consumed by now,
388             # then too bad, wipe it anyways.
389 0           $$data = "";
390 0 0         if ($self->{complement_object}) {
391 0           $self->{net_server}->log(4, "Shutting down complement on fileno [".fileno($self->{complement_object}->{fh})."]");
392             # If this end was closed, then tell the
393             # complement socket to close.
394 0           $mux->shutdown($self->{complement_object}->{fh}, 2);
395             # Make sure that when the complement
396             # socket finishes via mux_eof, that
397             # it doesn't waste its time trying
398             # to shutdown my socket, because I'm
399             # already finished.
400 0           delete $self->{complement_object}->{complement_object};
401             }
402             }
403              
404              
405             1;
406             __END__