File Coverage

blib/lib/POE/Component/Server/DNS.pm
Criterion Covered Total %
statement 236 311 75.8
branch 59 118 50.0
condition 14 54 25.9
subroutine 33 42 78.5
pod 8 8 100.0
total 350 533 65.6


line stmt bran cond sub pod time code
1             package POE::Component::Server::DNS;
2             $POE::Component::Server::DNS::VERSION = '0.32';
3             #ABSTRACT: A non-blocking, concurrent DNS server POE component
4              
5 5     5   973570 use strict;
  5         13  
  5         136  
6 5     5   27 use warnings;
  5         9  
  5         188  
7 5     5   24 use POE qw(Component::Client::DNS Wheel::ReadWrite Component::Client::DNS::Recursive Wheel::SocketFactory Filter::DNS::TCP);
  5         9  
  5         33  
8 5     5   180098 use Socket;
  5         14  
  5         3444  
9 5     5   29 use Net::DNS::RR;
  5         10  
  5         111  
10 5     5   26 use IO::Socket::INET;
  5         13  
  5         83  
11              
12             sub spawn {
13 5     5 1 94 my $package = shift;
14 5         25 my %args = @_;
15 5         45 $args{lc $_} = delete $args{$_} for keys %args;
16              
17 5         17 my $options = delete $args{options};
18              
19 5         19 my $self = bless \%args, $package;
20              
21 5         51 $self->{_handlers} = [ ];
22              
23 5 100       26 unless ( $self->{no_clients} ) {
24 4         49 $self->{_localhost} = Net::DNS::RR->new('localhost. 0 A 127.0.0.1');
25             }
26              
27 5 100 66     7513 $self->{session_id} = POE::Session->create(
28             object_states => [
29             $self => { shutdown => '_shutdown', _sock_err_tcp => '_sock_err', },
30             $self => [ qw(_start _dns_incoming _dns_err _dns_response _dns_recursive add_handler del_handler _handled_req _sock_up _sock_err log_event _sock_up_tcp) ],
31             ],
32             heap => $self,
33             options => ( $options && ref $options eq 'HASH' ? $options : { } ),
34             )->ID();
35              
36 5         617 return $self;
37             }
38              
39             sub _start {
40 5     5   2047 my ($kernel,$self,$session) = @_[KERNEL,OBJECT,SESSION];
41              
42 5         25 $self->{session_id} = $session->ID();
43              
44 5 50       40 if ( $self->{alias} ) {
45 0         0 $kernel->alias_set($self->{alias});
46             }
47             else {
48 5         34 $kernel->alias_set("$self");
49 5         205 $self->{alias} = "$self";
50             }
51              
52 5 100       47 unless ( $self->{no_clients} ) {
53 4 50 33     96 $self->{resolver_opts} = { } unless $self->{resolver_opts} and ref $self->{resolver_opts} eq 'HASH';
54 4         11 delete $self->{resolver_opts}->{Alias};
55 4         22 $self->{resolver} = POE::Component::Client::DNS->spawn( Alias => "resolver" . $self->session_id(), %{ $self->{resolver_opts} } );
  4         54  
56             # $self->{recursive}->hints( { event => '_hints' } );
57             }
58              
59             $self->{factory} = POE::Wheel::SocketFactory->new(
60             SocketProtocol => 'udp',
61             BindAddress => $self->{address} || INADDR_ANY,
62 5 50 50     4281 BindPort => ( defined $self->{port} ? $self->{port} : 53 ),
63             SuccessEvent => '_sock_up',
64             FailureEvent => '_sock_err',
65             );
66              
67             $self->{factory_tcp} = POE::Wheel::SocketFactory->new(
68             SocketProtocol => 'tcp',
69             Reuse => 1,
70             BindAddress => $self->{address} || INADDR_ANY,
71 5 50 50     1517 BindPort => ( defined $self->{port} ? $self->{port} : 53 ),
72             SuccessEvent => '_sock_up_tcp',
73             FailureEvent => '_sock_err_tcp',
74             );
75              
76 5         1900 undef;
77             }
78              
79             sub _sock_up {
80 5     5   3010 my ($kernel,$self,$dns_socket) = @_[KERNEL,OBJECT,ARG0];
81 5         59 $self->{_sockport} = ( sockaddr_in( getsockname($dns_socket) ) )[0];
82 5         109 delete $self->{factory};
83 5         153 $self->{dnsrw} = POE::Wheel::ReadWrite->new(
84             Handle => $dns_socket,
85             Driver => DNS::Driver::SendRecv->new(),
86             Filter => DNS::Filter::UDPDNS->new(),
87             InputEvent => '_dns_incoming',
88             ErrorEvent => '_dns_err',
89             );
90 5         1624 undef;
91             }
92              
93             sub _sock_up_tcp {
94 0     0   0 my ($kernel,$self,$dns_socket, $address, $port) = @_[KERNEL,OBJECT,ARG0, ARG1, ARG2];
95 0         0 $address = inet_ntoa($address);
96              
97 0         0 POE::Session->create(
98             object_states => [
99             $self => { _start => '_socket_success', _stop => '_socket_death' },
100             $self => [ qw( _sock_err _socket_input _socket_death _handled_req _dns_incoming _dns_recursive _dns_response) ],
101             ],
102             args => [$dns_socket],
103             heap => { _tcp_sockport => "$address:$port", },
104             );
105              
106 0         0 undef;
107             }
108              
109              
110             sub _socket_death {
111 0     0   0 my $heap = $_[HEAP];
112 0 0       0 if ($heap->{socket_wheel}) {
113 0         0 delete $heap->{socket_wheel};
114             }
115             }
116              
117             sub _socket_success {
118 0     0   0 my ($heap,$kernel,$connected_socket) = @_[HEAP, KERNEL, ARG0];
119              
120 0         0 $heap->{socket_wheel} = POE::Wheel::ReadWrite->new(
121             Handle => $connected_socket,
122             Filter => POE::Filter::DNS::TCP->new(),
123             InputEvent => '_dns_incoming',
124             ErrorEvent => '_sock_err',
125             );
126             }
127              
128             sub _socket_input {
129 0     0   0 my ($heap, $buf) = @_[HEAP, ARG0];
130 0         0 warn Dumper $buf;
131 0         0 delete $heap->{socket_wheel};
132             }
133              
134             sub _sock_err {
135 0     0   0 my ($operation, $errnum, $errstr, $wheel_id) = @_[ARG0..ARG3];
136             # ErrorEvent may also indicate EOF on a FileHandle by returning operation "read" error 0. For sockets, this means the remote end has closed the connection.
137 0 0 0     0 return undef if ($operation eq "read" and $errnum == 0);
138 0         0 delete $_[OBJECT]->{factory};
139 0         0 delete $_[OBJECT]->{"factory_tcp"};
140 0         0 die "Wheel $wheel_id generated $operation error $errnum: $errstr\n";
141 0         0 undef;
142             }
143              
144             sub session_id {
145 12     12 1 5853874 return $_[0]->{session_id};
146             }
147              
148             sub resolver {
149 0     0 1 0 return $_[0]->{resolver};
150             }
151              
152             sub sockport {
153 5     5 1 25021942 return $_[0]->{_sockport};
154             }
155              
156             sub shutdown {
157 0     0 1 0 my $self = shift;
158 0         0 $poe_kernel->post( $self->session_id() => 'shutdown' );
159             }
160              
161             sub _shutdown {
162 5     5   2394 my ($kernel,$self) = @_[KERNEL,OBJECT];
163 5         38 $kernel->alarm_remove_all();
164 5         308 $kernel->alias_remove( $_ ) for $kernel->alias_list( $_[SESSION] );
165 5         534 delete $self->{dnsrw};
166 5         1836 delete $self->{'factory'};
167 5         45 delete $self->{'factory_tcp'};
168 5 100       1123 unless ( $self->{no_clients} ) {
169 4         31 $self->{resolver}->shutdown();
170             #$self->{recursive}->shutdown();
171             }
172 5         987 $kernel->refcount_decrement( $_->{session}, __PACKAGE__ ) for @{ $self->{_handlers} };
  5         34  
173 5         108 $kernel->refcount_decrement( $_, __PACKAGE__ ) for keys %{ $self->{_sessions} };
  5         29  
174 5         108 delete $self->{_handlers};
175 5         25 undef;
176             }
177              
178             sub log_event {
179 1     1 1 65 my ($kernel,$self,$sender,$event) = @_[KERNEL,OBJECT,SENDER,ARG0];
180 1         4 $sender = $sender->ID();
181              
182 1 50 33     9 if ( exists $self->{_sessions}->{ $sender } and !$event ) {
183 0         0 delete $self->{_sessions}->{ $sender };
184 0         0 $kernel->refcount_decrement( $sender => __PACKAGE__ );
185 0         0 return;
186             }
187              
188 1 50       9 if ( exists $self->{_sessions}->{ $sender } ) {
189 0         0 $self->{_sessions}->{ $sender } = $event;
190 0         0 return;
191             }
192              
193 1         3 $self->{_sessions}->{ $sender } = $event;
194 1         5 $kernel->refcount_increment( $sender => __PACKAGE__ );
195 1         34 return;
196             }
197              
198             sub add_handler {
199 2     2 1 326 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
200 2         8 $sender = $sender->ID();
201              
202             # Get the arguments
203 2         9 my $args;
204 2 50       9 if (ref($_[ARG0]) eq 'HASH') {
205 2         4 $args = { %{ $_[ARG0] } };
  2         10  
206             }
207             else {
208 0         0 warn "first parameter must be a ref hash, trying to adjust. "
209             ."(fix this to get rid of this message)";
210 0         0 $args = { @_[ARG0 .. $#_ ] };
211             }
212              
213 2         6 $args->{ lc $_ } = delete $args->{$_} for keys %{ $args };
  2         23  
214              
215 2 50       10 unless ( $args->{label} ) {
216 0         0 warn "you must supply a label argument, to make it unique\n";
217 0         0 return;
218             }
219              
220 2 50       4 if ( grep { $_->{label} eq $args->{label} } @{ $self->{_handlers} } ) {
  0         0  
  2         10  
221 0         0 warn "you must supply a unique label argument, I already have that one\n";
222 0         0 return;
223             }
224              
225 2 50       9 unless ( $args->{event} ) {
226 0         0 warn "you must supply an event argument, otherwise where do I send the replies to\n";
227 0         0 return;
228             }
229              
230 2 50       8 unless ( $args->{match} ) {
231 0         0 warn "you must supply a match argument, otherwise what's the point\n";
232 0         0 return;
233             }
234              
235 2         5 my $regex;
236 2         16 eval { $regex = qr/$args->{match}/ };
  2         47  
237              
238 2 50       8 if ( $@ ) {
239 0         0 warn "The match argument you supplied was fubar, please try harder\n";
240 0         0 return;
241             }
242             else {
243 2         4 $args->{match} = $regex;
244             }
245              
246 2 50       12 $args->{session} = $sender unless $args->{session};
247 2 50       11 if ( my $ref = $kernel->alias_resolve( $args->{session} ) ) {
248 2         54 $args->{session} = $ref->ID();
249             }
250             else {
251 0         0 $args->{session} = $sender->ID();
252             }
253              
254 2         16 $kernel->refcount_increment( $args->{session}, __PACKAGE__ );
255              
256 2         60 push @{ $self->{_handlers} }, $args;
  2         8  
257              
258 2         7 undef;
259             }
260              
261             sub del_handler {
262 0     0 1 0 my ($kernel,$self,$label) = @_[KERNEL,OBJECT,ARG0];
263 0 0       0 return unless $label;
264              
265 0         0 my $i = 0; my $rec;
  0         0  
266 0         0 for ( @{ $self->{_handlers} } ) {
  0         0  
267 0 0       0 if ( $_->{label} eq $label ) {
268 0         0 splice( @{ $self->{_handlers} }, $i, 1 );
  0         0  
269 0         0 $rec = $_;
270 0         0 last;
271             }
272             }
273              
274 0         0 $kernel->refcount_decrement( $rec->{session}, __PACKAGE__ );
275 0         0 undef;
276             }
277              
278             sub _dns_incoming {
279 562     562   23569 my($kernel,$self,$heap,$session,$dnsq) = @_[KERNEL,OBJECT,HEAP,SESSION,ARG0];
280              
281             # TCP remote address is handled differently than UDP, so fix that here.
282 562 50       1606 if (defined($heap->{_tcp_sockport})) {
283 0         0 $dnsq->answerfrom($heap->{_tcp_sockport});
284             }
285              
286 562         1795 my($q) = $dnsq->question();
287 562 50       3740 return unless $q;
288              
289 562         705 foreach my $handler ( @{ $self->{_handlers} } ) {
  562         1503  
290 380 100       1107 next unless $q->qname =~ $handler->{match};
291 376         16686 my $callback = $session->callback( '_handled_req', $dnsq );
292             $kernel->post(
293             $handler->{session},
294             $handler->{event},
295             $q->qname,
296             $q->qclass,
297             $q->qtype,
298             $callback,
299 376         24358 $dnsq->answerfrom, $dnsq, $handler->{'label'} );
300 376         46004 return;
301             }
302              
303 186 100       879 if ( $self->{no_clients} ) {
304             # Refuse unhandled requests, like an authoritative-only
305             # BIND server would.
306 1         4 $dnsq->header->rcode('REFUSED');
307 1         133 $dnsq->header->qr(1);
308 1         22 $dnsq->header->aa(0);
309 1         18 $dnsq->header->ra(0);
310 1         17 $dnsq->header->ad(0);
311 1         19 my $str = $dnsq->string(); # Doesn't work without this, fucked if I know why.
312 1         405 $self->_dispatch_log( $dnsq );
313             # $self->{dnsrw}->put( $dnsq ) if $self->{dnsrw};
314 1 50 33     1080 $self->{"dnsrw"}->put( $dnsq ) if (!(defined($heap) && defined($heap->{socket_wheel})) && $self->{"dnsrw"});
      33        
315 1 50       59 $heap->{socket_wheel}->put($dnsq) if $heap->{socket_wheel};
316              
317 1         4 return;
318             }
319              
320 185 50       665 if ( $q->qname =~ /^localhost\.*$/i ) {
321 0         0 $dnsq->push( answer => $self->{_localhost} );
322 0         0 $self->_dispatch_log( $dnsq );
323 0 0 0     0 $self->{"dnsrw"}->put( $dnsq ) if (!(defined($heap) && defined($heap->{socket_wheel})) && $self->{"dnsrw"});
      0        
324 0 0       0 $heap->{socket_wheel}->put($dnsq) if $heap->{socket_wheel};
325 0         0 return;
326             }
327              
328 185 100       7731 if ( $self->{forward_only} ) {
329 124         383 my %query = (
330             class => $q->qclass,
331             type => $q->qtype,
332             host => $q->qname,
333             context => [ $dnsq->answerfrom, $dnsq->header->id ],
334             event => '_dns_response',
335             );
336              
337 124         5435 my $response = $self->{resolver}->resolve( %query );
338 124 50       175878 $kernel->yield( '_dns_response', $response ) if $response;
339              
340             }
341             else {
342             # $self->{recursive}->query_dorecursion( { event => '_dns_recursive', data => [ $dnsq, $dnsq->answerfrom, $dnsq->header->id ], }, $q->qname, $q->qtype, $q->qclass );
343 61         236 POE::Component::Client::DNS::Recursive->resolve(
344             event => '_dns_recursive',
345             context => [ $dnsq, $dnsq->answerfrom, $dnsq->header->id ],
346             host => $q->qname,
347             type => $q->qtype,
348             class => $q->qclass,
349             );
350             }
351              
352 185         91286 undef;
353             }
354              
355             sub _handled_req {
356 376     376   219010 my ($kernel,$self,$passthru,$passback,$heap) = @_[KERNEL,OBJECT,ARG0,ARG1,HEAP];
357 376         622 my $reply = $passthru->[0];
358 376         512 my ($rcode, $ans, $auth, $add, $headermask) = @{ $passback };
  376         843  
359 376         1176 $reply->header->rcode($rcode);
360 376 50       35153 $reply->push("answer", @$ans) if $ans;
361 376 50       5720 $reply->push("authority", @$auth) if $auth;
362 376 50       4555 $reply->push("additional", @$add) if $add;
363 376 50       3780 if (!defined ($headermask)) {
364 0 0       0 $reply->header->ra($self->{no_clients} ? 0 : 1);
365 0         0 $reply->header->ad(0);
366             }
367             else {
368 376 50       1547 $reply->header->aa(1) if $headermask->{'aa'};
369 376 50       6252 $reply->header->ra(1) if $headermask->{'ra'};
370 376 50       970 $reply->header->ad(1) if $headermask->{'ad'};
371             }
372              
373 376         943 $reply->header->qr(1);
374 376         5595 $self->_dispatch_log( $reply );
375              
376 376 50 33     3675 $self->{"dnsrw"}->put( $reply ) if (!(defined($heap) && defined($heap->{socket_wheel})) && $self->{"dnsrw"});
      33        
377 376 50       14219 $heap->{socket_wheel}->put($reply) if $heap->{socket_wheel};
378 376         1268 undef;
379             }
380              
381             sub _dns_err {
382 0     0   0 my($kernel,$self, $op, $errnum, $errstr) = @_[KERNEL,OBJECT, ARG0..ARG2];
383 0         0 warn "DNS readwrite: $op generated error $errnum: $errstr\n";
384 0 0 0     0 if (!($op eq "read" and ($errnum == 0 || $errnum == 104)))
      0        
385             {
386 0         0 warn "SHUTDOWN";
387 0         0 $kernel->yield('shutdown');
388             }
389 0         0 undef;
390             }
391              
392             sub _dns_recursive {
393 61     61   8214930 my ($kernel,$heap,$self,$data) = @_[KERNEL,HEAP,OBJECT,ARG0];
394 61 100       382 return if $data->{error};
395 49         96 my ($dnsq,$answerfrom,$id) = @{ $data->{context} };
  49         180  
396              
397 49         132 my $socket = $heap->{socket_wheel};
398              
399 49         116 my $response = $data->{response};
400 49 50       275 if ( $response ) {
401 49         199 $response->header->id( $id );
402 49         752 $response->answerfrom( $answerfrom );
403 49         514 $self->_dispatch_log( $response );
404 49 50 33     573 $self->{"dnsrw"}->put( $response ) if (!(defined($socket)) && $self->{"dnsrw"});
405 49 50       2262 $socket->put($response) if $socket;
406 49         218 return;
407             }
408 0         0 $dnsq->header->rcode('NXDOMAIN');
409 0         0 $self->_dispatch_log( $dnsq );
410             # $self->{dnsrw}->put( $dnsq ) if $self->{dnsrw};
411 0 0 0     0 $self->{"dnsrw"}->put( $dnsq ) if (!(defined($socket)) && $self->{"dnsrw"});
412 0 0       0 $socket->put($dnsq) if $socket;
413              
414 0         0 undef;
415             }
416              
417             sub _dns_response {
418 124     124   112886 my ($kernel,$self,$heap,$reply) = @_[KERNEL,OBJECT,HEAP,ARG0];
419              
420 124         206 my ($answerfrom,$id) = @{ $reply->{context} };
  124         308  
421 124         269 my $response = delete $reply->{response};
422 124         432 $response->header->id( $id );
423 124         1439 $response->answerfrom( $answerfrom );
424 124         803 $self->_dispatch_log( $response );
425 124 100 33     1330 $self->{"dnsrw"}->put( $response ) if (!(defined($heap) && defined($heap->{socket_wheel})) && $self->{"dnsrw"});
      33        
426 124 50       5845 $heap->{socket_wheel}->put($response) if $heap->{socket_wheel};
427 124         1292 undef;
428             }
429              
430             sub _dispatch_log {
431 550     550   782 my $self = shift;
432 550   50     1428 my $packet = shift || return;
433 550         1338 my $af = $packet->answerfrom;
434 550         2859 $poe_kernel->post( $_, $self->{_sessions}->{$_}, $af, $packet ) for keys %{ $self->{_sessions} };
  550         1846  
435 550         1082 return 1;
436             }
437              
438             package DNS::Driver::SendRecv;
439             $DNS::Driver::SendRecv::VERSION = '0.32';
440 5     5   21996 use strict;
  5         11  
  5         130  
441 5     5   3651 use POE::Driver;
  5         904  
  5         141  
442 5     5   25 use Socket;
  5         9  
  5         4499  
443              
444             sub new {
445 5     5   13 my $class = shift;
446 5         12 my $self = []; # the output queue
447 5         41 bless $self, $class;
448             }
449              
450             sub get {
451 554     554   27590362 my $self = shift;
452 554         865 my $fh = shift;
453              
454 554         768 my @ret;
455 554         730 while (1) {
456 1116         4972 my $from = recv($fh, my $buffer = '', 4096, 0 );
457 1116 100       3123 last if !$from;
458 562         1914 push @ret, [ $from, $buffer ];
459             }
460 554 50       1367 return if !@ret;
461 554         1684 return \@ret;
462             }
463              
464             sub put {
465 549     549   860 my $self = shift;
466 549         776 my $data = shift;
467              
468 549         996 push @$self, @$data;
469 549         731 my $sum = 0;
470 549         1841 $sum += length( $_->[1] ) for @$self;
471 549         1487 return $sum;
472             }
473              
474             sub flush {
475 540     540   387454 my $self = shift;
476 540         802 my $fh = shift;
477              
478 540         1802 while ( @$self ) {
479 549 50       15939 my $n = send($fh, $self->[0][1], 0, $self->[0][0])
480             or return;
481 549 50       1654 $n == length($self->[0][1])
482             or die "Couldn't write complete message to socket: $!\n";
483 549         3003 shift @$self;
484             }
485             }
486              
487             package DNS::Filter::UDPDNS;
488             $DNS::Filter::UDPDNS::VERSION = '0.32';
489 5     5   70 use strict;
  5         9  
  5         149  
490 5     5   27 use POE::Filter;
  5         8  
  5         95  
491 5     5   23 use Socket;
  5         8  
  5         3074  
492 5     5   26 use Net::DNS::Packet;
  5         8  
  5         1916  
493              
494             sub new {
495 5     5   15 my $class = shift;
496 5         57 bless {}, $class;
497             }
498              
499             sub get {
500 554     554   2897 my $self = shift;
501 554         786 my $data = shift;
502              
503 554         730 my @ret;
504 554         1129 for my $d ( @$data ) {
505 562 50       1481 ref($d) eq "ARRAY"
506             or die "UDPDNS filter expected arrayrefs for input\n";
507 562         1775 my($port, $inet) = sockaddr_in($d->[0]);
508 562         7661 my $inetstr = inet_ntoa($inet);
509 562         3518 my($p, $err) = Net::DNS::Packet->new(\$d->[1]);
510 562 50       49103 if ( !$p ) {
511 0         0 warn "Cannot create DNS question for packet received from " .
512             "$inetstr: $err\n";
513             } else {
514 562         2405 $p->answerfrom("$inetstr:$port");
515 562         4225 push @ret, $p;
516             }
517             }
518 554         1775 return \@ret;
519             }
520              
521             sub put {
522 549     549   3643 my $self = shift;
523 549         723 my $data = shift;
524              
525 549         688 my @ret;
526 549         1112 for my $d ( @$data ) {
527 549         1425 my($inetstr, $port) = split /:/, $d->answerfrom();
528 549         4700 $d->{buffer} = ''; #sigh
529 549 50       1205 if ( !defined $port ) {
530 0         0 warn "answerfrom not set in DNS packet, no destination known\n";
531             } else {
532 549         3855 push @ret,
533             [ pack_sockaddr_in($port, inet_aton($inetstr)), $d->data ];
534             }
535             }
536 549         75605 return \@ret;
537             }
538              
539             1;
540              
541             __END__