File Coverage

blib/lib/POE/Component/Server/DNS.pm
Criterion Covered Total %
statement 235 310 75.8
branch 58 118 49.1
condition 14 54 25.9
subroutine 33 42 78.5
pod 8 8 100.0
total 348 532 65.4


line stmt bran cond sub pod time code
1             package POE::Component::Server::DNS;
2             {
3             $POE::Component::Server::DNS::VERSION = '0.30';
4             }
5              
6             #ABSTRACT: A non-blocking, concurrent DNS server POE component
7              
8 5     5   2280560 use strict;
  5         12  
  5         226  
9 5     5   37 use warnings;
  5         12  
  5         221  
10 5     5   32 use POE qw(Component::Client::DNS Wheel::ReadWrite Component::Client::DNS::Recursive Wheel::SocketFactory Filter::DNS::TCP);
  5         10  
  5         62  
11 5     5   386949 use Socket;
  5         14  
  5         4208  
12 5     5   35 use Net::DNS::RR;
  5         7  
  5         116  
13 5     5   28 use IO::Socket::INET;
  5         20  
  5         80  
14              
15             sub spawn {
16 5     5 1 112 my $package = shift;
17 5         27 my %args = @_;
18 5         50 $args{lc $_} = delete $args{$_} for keys %args;
19              
20 5         19 my $options = delete $args{options};
21              
22 5         22 my $self = bless \%args, $package;
23              
24 5         47 $self->{_handlers} = [ ];
25              
26 5 100       26 unless ( $self->{no_clients} ) {
27 4         51 $self->{_localhost} = Net::DNS::RR->new('localhost. 0 A 127.0.0.1');
28             }
29              
30 5 100 66     8437 $self->{session_id} = POE::Session->create(
31             object_states => [
32             $self => { shutdown => '_shutdown', _sock_err_tcp => '_sock_err', },
33             $self => [ qw(_start _dns_incoming _dns_err _dns_response _dns_recursive add_handler del_handler _handled_req _sock_up _sock_err log_event _sock_up_tcp) ],
34             ],
35             heap => $self,
36             options => ( $options && ref $options eq 'HASH' ? $options : { } ),
37             )->ID();
38              
39 5         1322 return $self;
40             }
41              
42             sub _start {
43 5     5   1972 my ($kernel,$self,$session) = @_[KERNEL,OBJECT,SESSION];
44              
45 5         29 $self->{session_id} = $session->ID();
46              
47 5 50       42 if ( $self->{alias} ) {
48 0         0 $kernel->alias_set($self->{alias});
49             }
50             else {
51 5         116 $kernel->alias_set("$self");
52 5         245 $self->{alias} = "$self";
53             }
54              
55 5 100       34 unless ( $self->{no_clients} ) {
56 4 50 33     32 $self->{resolver_opts} = { } unless $self->{resolver_opts} and ref $self->{resolver_opts} eq 'HASH';
57 4         9 delete $self->{resolver_opts}->{Alias};
58 4         21 $self->{resolver} = POE::Component::Client::DNS->spawn( Alias => "resolver" . $self->session_id(), %{ $self->{resolver_opts} } );
  4         63  
59             # $self->{recursive}->hints( { event => '_hints' } );
60             }
61              
62 5 50 50     4728 $self->{factory} = POE::Wheel::SocketFactory->new(
63             SocketProtocol => 'udp',
64             BindAddress => $self->{address} || INADDR_ANY,
65             BindPort => ( defined $self->{port} ? $self->{port} : 53 ),
66             SuccessEvent => '_sock_up',
67             FailureEvent => '_sock_err',
68             );
69              
70 5 50 50     2732 $self->{factory_tcp} = POE::Wheel::SocketFactory->new(
71             SocketProtocol => 'tcp',
72             Reuse => 1,
73             BindAddress => $self->{address} || INADDR_ANY,
74             BindPort => ( defined $self->{port} ? $self->{port} : 53 ),
75             SuccessEvent => '_sock_up_tcp',
76             FailureEvent => '_sock_err_tcp',
77             );
78              
79 5         2857 undef;
80             }
81              
82             sub _sock_up {
83 5     5   4722 my ($kernel,$self,$dns_socket) = @_[KERNEL,OBJECT,ARG0];
84 5         82 $self->{_sockport} = ( sockaddr_in( getsockname($dns_socket) ) )[0];
85 5         140 delete $self->{factory};
86 5         170 $self->{dnsrw} = POE::Wheel::ReadWrite->new(
87             Handle => $dns_socket,
88             Driver => DNS::Driver::SendRecv->new(),
89             Filter => DNS::Filter::UDPDNS->new(),
90             InputEvent => '_dns_incoming',
91             ErrorEvent => '_dns_err',
92             );
93 5         2589 undef;
94             }
95              
96             sub _sock_up_tcp {
97 0     0   0 my ($kernel,$self,$dns_socket, $address, $port) = @_[KERNEL,OBJECT,ARG0, ARG1, ARG2];
98 0         0 $address = inet_ntoa($address);
99              
100 0         0 POE::Session->create(
101             object_states => [
102             $self => { _start => '_socket_success', _stop => '_socket_death' },
103             $self => [ qw( _sock_err _socket_input _socket_death _handled_req _dns_incoming _dns_recursive _dns_response) ],
104             ],
105             args => [$dns_socket],
106             heap => { _tcp_sockport => "$address:$port", },
107             );
108              
109 0         0 undef;
110             }
111              
112              
113             sub _socket_death {
114 0     0   0 my $heap = $_[HEAP];
115 0 0       0 if ($heap->{socket_wheel}) {
116 0         0 delete $heap->{socket_wheel};
117             }
118             }
119              
120             sub _socket_success {
121 0     0   0 my ($heap,$kernel,$connected_socket) = @_[HEAP, KERNEL, ARG0];
122              
123 0         0 $heap->{socket_wheel} = POE::Wheel::ReadWrite->new(
124             Handle => $connected_socket,
125             Filter => POE::Filter::DNS::TCP->new(),
126             InputEvent => '_dns_incoming',
127             ErrorEvent => '_sock_err',
128             );
129             }
130              
131             sub _socket_input {
132 0     0   0 my ($heap, $buf) = @_[HEAP, ARG0];
133 0         0 warn Dumper $buf;
134 0         0 delete $heap->{socket_wheel};
135             }
136              
137             sub _sock_err {
138 0     0   0 my ($operation, $errnum, $errstr, $wheel_id) = @_[ARG0..ARG3];
139             # ErrorEvent may also indicate EOF on a FileHandle by returning operation "read" error 0. For sockets, this means the remote end has closed the connection.
140 0 0 0     0 return undef if ($operation eq "read" and $errnum == 0);
141 0         0 delete $_[OBJECT]->{factory};
142 0         0 delete $_[OBJECT]->{"factory_tcp"};
143 0         0 die "Wheel $wheel_id generated $operation error $errnum: $errstr\n";
144 0         0 undef;
145             }
146              
147             sub session_id {
148 12     12 1 5714334 return $_[0]->{session_id};
149             }
150              
151             sub resolver {
152 0     0 1 0 return $_[0]->{resolver};
153             }
154              
155             sub sockport {
156 5     5 1 25024837 return $_[0]->{_sockport};
157             }
158              
159             sub shutdown {
160 0     0 1 0 my $self = shift;
161 0         0 $poe_kernel->post( $self->session_id() => 'shutdown' );
162             }
163              
164             sub _shutdown {
165 5     5   1382 my ($kernel,$self) = @_[KERNEL,OBJECT];
166 5         41 $kernel->alarm_remove_all();
167 5         373 $kernel->alias_remove( $_ ) for $kernel->alias_list( $_[SESSION] );
168 5         564 delete $self->{dnsrw};
169 5         2048 delete $self->{'factory'};
170 5         44 delete $self->{'factory_tcp'};
171 5 100       1180 unless ( $self->{no_clients} ) {
172 4         42 $self->{resolver}->shutdown();
173             #$self->{recursive}->shutdown();
174             }
175 5         1111 $kernel->refcount_decrement( $_->{session}, __PACKAGE__ ) for @{ $self->{_handlers} };
  5         38  
176 5         110 $kernel->refcount_decrement( $_, __PACKAGE__ ) for keys %{ $self->{_sessions} };
  5         37  
177 5         116 delete $self->{_handlers};
178 5         37 undef;
179             }
180              
181             sub log_event {
182 1     1 1 62 my ($kernel,$self,$sender,$event) = @_[KERNEL,OBJECT,SENDER,ARG0];
183 1         5 $sender = $sender->ID();
184              
185 1 50 33     18 if ( exists $self->{_sessions}->{ $sender } and !$event ) {
186 0         0 delete $self->{_sessions}->{ $sender };
187 0         0 $kernel->refcount_decrement( $sender => __PACKAGE__ );
188 0         0 return;
189             }
190              
191 1 50       5 if ( exists $self->{_sessions}->{ $sender } ) {
192 0         0 $self->{_sessions}->{ $sender } = $event;
193 0         0 return;
194             }
195              
196 1         3 $self->{_sessions}->{ $sender } = $event;
197 1         5 $kernel->refcount_increment( $sender => __PACKAGE__ );
198 1         31 return;
199             }
200              
201             sub add_handler {
202 2     2 1 727 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
203 2         10 $sender = $sender->ID();
204              
205             # Get the arguments
206 2         10 my $args;
207 2 50       12 if (ref($_[ARG0]) eq 'HASH') {
208 2         6 $args = { %{ $_[ARG0] } };
  2         12  
209             }
210             else {
211 0         0 warn "first parameter must be a ref hash, trying to adjust. "
212             ."(fix this to get rid of this message)";
213 0         0 $args = { @_[ARG0 .. $#_ ] };
214             }
215              
216 2         6 $args->{ lc $_ } = delete $args->{$_} for keys %{ $args };
  2         19  
217              
218 2 50       29 unless ( $args->{label} ) {
219 0         0 warn "you must supply a label argument, to make it unique\n";
220 0         0 return;
221             }
222              
223 2 50       7 if ( grep { $_->{label} eq $args->{label} } @{ $self->{_handlers} } ) {
  0         0  
  2         12  
224 0         0 warn "you must supply a unique label argument, I already have that one\n";
225 0         0 return;
226             }
227              
228 2 50       11 unless ( $args->{event} ) {
229 0         0 warn "you must supply an event argument, otherwise where do I send the replies to\n";
230 0         0 return;
231             }
232              
233 2 50       9 unless ( $args->{match} ) {
234 0         0 warn "you must supply a match argument, otherwise what's the point\n";
235 0         0 return;
236             }
237              
238 2         4 my $regex;
239 2         4 eval { $regex = qr/$args->{match}/ };
  2         66  
240              
241 2 50       9 if ( $@ ) {
242 0         0 warn "The match argument you supplied was fubar, please try harder\n";
243 0         0 return;
244             }
245             else {
246 2         6 $args->{match} = $regex;
247             }
248              
249 2 50       11 $args->{session} = $sender unless $args->{session};
250 2 50       13 if ( my $ref = $kernel->alias_resolve( $args->{session} ) ) {
251 2         63 $args->{session} = $ref->ID();
252             }
253             else {
254 0         0 $args->{session} = $sender->ID();
255             }
256              
257 2         20 $kernel->refcount_increment( $args->{session}, __PACKAGE__ );
258              
259 2         68 push @{ $self->{_handlers} }, $args;
  2         6  
260              
261 2         9 undef;
262             }
263              
264             sub del_handler {
265 0     0 1 0 my ($kernel,$self,$label) = @_[KERNEL,OBJECT,ARG0];
266 0 0       0 return unless $label;
267              
268 0         0 my $i = 0; my $rec;
  0         0  
269 0         0 for ( @{ $self->{_handlers} } ) {
  0         0  
270 0 0       0 if ( $_->{label} eq $label ) {
271 0         0 splice( @{ $self->{_handlers} }, $i, 1 );
  0         0  
272 0         0 $rec = $_;
273 0         0 last;
274             }
275             }
276              
277 0         0 $kernel->refcount_decrement( $rec->{session}, __PACKAGE__ );
278 0         0 undef;
279             }
280              
281             sub _dns_incoming {
282 210     210   12969 my($kernel,$self,$heap,$session,$dnsq) = @_[KERNEL,OBJECT,HEAP,SESSION,ARG0];
283              
284             # TCP remote address is handled differently than UDP, so fix that here.
285 210 50       882 if (defined($heap->{_tcp_sockport})) {
286 0         0 $dnsq->answerfrom($heap->{_tcp_sockport});
287             }
288              
289 210         1017 my($q) = $dnsq->question();
290 210 50       1398 return unless $q;
291              
292 210         381 foreach my $handler ( @{ $self->{_handlers} } ) {
  210         663  
293 181 100       1512 next unless $q->qname =~ $handler->{match};
294 177         9984 my $callback = $session->callback( '_handled_req', $dnsq );
295 177         13426 $kernel->post(
296             $handler->{session},
297             $handler->{event},
298             $q->qname,
299             $q->qclass,
300             $q->qtype,
301             $callback,
302             $dnsq->answerfrom, $dnsq, $handler->{'label'} );
303 177         24512 return;
304             }
305              
306 33 100       388 if ( $self->{no_clients} ) {
307             # Refuse unhandled requests, like an authoritative-only
308             # BIND server would.
309 1         5 $dnsq->header->rcode('REFUSED');
310 1         140 $dnsq->header->aa(0);
311 1         24 $dnsq->header->ra(0);
312 1         19 $dnsq->header->ad(0);
313 1         21 my $str = $dnsq->string(); # Doesn't work without this, fucked if I know why.
314 1         394 $self->_dispatch_log( $dnsq );
315             # $self->{dnsrw}->put( $dnsq ) if $self->{dnsrw};
316 1 50 33     34 $self->{"dnsrw"}->put( $dnsq ) if (!(defined($heap) && defined($heap->{socket_wheel})) && $self->{"dnsrw"});
      33        
317 1 50       52 $heap->{socket_wheel}->put($dnsq) if $heap->{socket_wheel};
318              
319 1         9 return;
320             }
321              
322 32 50       179 if ( $q->qname =~ /^localhost\.*$/i ) {
323 0         0 $dnsq->push( answer => $self->{_localhost} );
324 0         0 $self->_dispatch_log( $dnsq );
325 0 0 0     0 $self->{"dnsrw"}->put( $dnsq ) if (!(defined($heap) && defined($heap->{socket_wheel})) && $self->{"dnsrw"});
      0        
326 0 0       0 $heap->{socket_wheel}->put($dnsq) if $heap->{socket_wheel};
327 0         0 return;
328             }
329              
330 32 100       2241 if ( $self->{forward_only} ) {
331 6         28 my %query = (
332             class => $q->qclass,
333             type => $q->qtype,
334             host => $q->qname,
335             context => [ $dnsq->answerfrom, $dnsq->header->id ],
336             event => '_dns_response',
337             );
338              
339 6         436 my $response = $self->{resolver}->resolve( %query );
340 6 50       36728 $kernel->yield( '_dns_response', $response ) if $response;
341              
342             }
343             else {
344             # $self->{recursive}->query_dorecursion( { event => '_dns_recursive', data => [ $dnsq, $dnsq->answerfrom, $dnsq->header->id ], }, $q->qname, $q->qtype, $q->qclass );
345 26         109 POE::Component::Client::DNS::Recursive->resolve(
346             event => '_dns_recursive',
347             context => [ $dnsq, $dnsq->answerfrom, $dnsq->header->id ],
348             host => $q->qname,
349             type => $q->qtype,
350             class => $q->qclass,
351             );
352             }
353              
354 32         438337 undef;
355             }
356              
357             sub _handled_req {
358 177     177   140544 my ($kernel,$self,$passthru,$passback,$heap) = @_[KERNEL,OBJECT,ARG0,ARG1,HEAP];
359 177         331 my $reply = $passthru->[0];
360 177         246 my ($rcode, $ans, $auth, $add, $headermask) = @{ $passback };
  177         368  
361 177         816 $reply->header->rcode($rcode);
362 177 50       19649 $reply->push("answer", @$ans) if $ans;
363 177 50       10072 $reply->push("authority", @$auth) if $auth;
364 177 50       7450 $reply->push("additional", @$add) if $add;
365 177 50       9232 if (!defined ($headermask)) {
366 0 0       0 $reply->header->ra($self->{no_clients} ? 0 : 1);
367 0         0 $reply->header->ad(0);
368             }
369             else {
370 177 50       780 $reply->header->aa(1) if $headermask->{'aa'};
371 177 50       3412 $reply->header->ra(1) if $headermask->{'ra'};
372 177 50       468 $reply->header->ad(1) if $headermask->{'ad'};
373             }
374              
375 177         479 $reply->header->qr(1);
376 177         3125 $self->_dispatch_log( $reply );
377              
378 177 50 33     2549 $self->{"dnsrw"}->put( $reply ) if (!(defined($heap) && defined($heap->{socket_wheel})) && $self->{"dnsrw"});
      33        
379 177 50       17308 $heap->{socket_wheel}->put($reply) if $heap->{socket_wheel};
380 177         675 undef;
381             }
382              
383             sub _dns_err {
384 0     0   0 my($kernel,$self, $op, $errnum, $errstr) = @_[KERNEL,OBJECT, ARG0..ARG2];
385 0         0 warn "DNS readwrite: $op generated error $errnum: $errstr\n";
386 0 0 0     0 if (!($op eq "read" and ($errnum == 0 || $errnum == 104)))
      0        
387             {
388 0         0 warn "SHUTDOWN";
389 0         0 $kernel->yield('shutdown');
390             }
391 0         0 undef;
392             }
393              
394             sub _dns_recursive {
395 26     26   11302669 my ($kernel,$heap,$self,$data) = @_[KERNEL,HEAP,OBJECT,ARG0];
396 26 100       467 return if $data->{error};
397 14         30 my ($dnsq,$answerfrom,$id) = @{ $data->{context} };
  14         61  
398              
399 14         49 my $socket = $heap->{socket_wheel};
400              
401 14         41 my $response = $data->{response};
402 14 50       81 if ( $response ) {
403 14         142 $response->header->id( $id );
404 14         662 $response->answerfrom( $answerfrom );
405 14         138 $self->_dispatch_log( $response );
406 14 50 33     234 $self->{"dnsrw"}->put( $response ) if (!(defined($socket)) && $self->{"dnsrw"});
407 14 50       1328 $socket->put($response) if $socket;
408 14         67 return;
409             }
410 0         0 $dnsq->header->rcode('NXDOMAIN');
411 0         0 $self->_dispatch_log( $dnsq );
412             # $self->{dnsrw}->put( $dnsq ) if $self->{dnsrw};
413 0 0 0     0 $self->{"dnsrw"}->put( $dnsq ) if (!(defined($socket)) && $self->{"dnsrw"});
414 0 0       0 $socket->put($dnsq) if $socket;
415              
416 0         0 undef;
417             }
418              
419             sub _dns_response {
420 6     6   44755 my ($kernel,$self,$heap,$reply) = @_[KERNEL,OBJECT,HEAP,ARG0];
421              
422 6         15 my ($answerfrom,$id) = @{ $reply->{context} };
  6         27  
423 6         20 my $response = delete $reply->{response};
424 6         8846 $response->header->id( $id );
425 6         595 $response->answerfrom( $answerfrom );
426 6         61 $self->_dispatch_log( $response );
427 6 50 33     134 $self->{"dnsrw"}->put( $response ) if (!(defined($heap) && defined($heap->{socket_wheel})) && $self->{"dnsrw"});
      33        
428 6 50       401 $heap->{socket_wheel}->put($response) if $heap->{socket_wheel};
429 6         218 undef;
430             }
431              
432             sub _dispatch_log {
433 198     198   309 my $self = shift;
434 198   50     507 my $packet = shift || return;
435 198         570 my $af = $packet->answerfrom;
436 198         1215 $poe_kernel->post( $_, $self->{_sessions}->{$_}, $af, $packet ) for keys %{ $self->{_sessions} };
  198         1134  
437 198         528 return 1;
438             }
439              
440             package DNS::Driver::SendRecv;
441             {
442             $DNS::Driver::SendRecv::VERSION = '0.30';
443             }
444              
445 5     5   43055 use strict;
  5         14  
  5         221  
446 5     5   10669 use POE::Driver;
  5         1075  
  5         158  
447 5     5   33 use Socket;
  5         7  
  5         6012  
448              
449             sub new {
450 5     5   15 my $class = shift;
451 5         15 my $self = []; # the output queue
452 5         51 bless $self, $class;
453             }
454              
455             sub get {
456 202     202   25920183 my $self = shift;
457 202         338 my $fh = shift;
458              
459 202         324 my @ret;
460 202         900 while (1) {
461 412         3938 my $from = recv($fh, my $buffer = '', 4096, 0 );
462 412 100       1132 last if !$from;
463 210         804 push @ret, [ $from, $buffer ];
464             }
465 202 50       612 return if !@ret;
466 202         884 return \@ret;
467             }
468              
469             sub put {
470 198     198   326 my $self = shift;
471 198         358 my $data = shift;
472              
473 198         426 push @$self, @$data;
474 198         647 my $sum = 0;
475 198         836 $sum += length( $_->[1] ) for @$self;
476 198         867 return $sum;
477             }
478              
479             sub flush {
480 194     194   101918 my $self = shift;
481 194         359 my $fh = shift;
482              
483 194         780 while ( @$self ) {
484 198 50       11240 my $n = send($fh, $self->[0][1], 0, $self->[0][0])
485             or return;
486 198 50       1093 $n == length($self->[0][1])
487             or die "Couldn't write complete message to socket: $!\n";
488 198         1165 shift @$self;
489             }
490             }
491              
492             package DNS::Filter::UDPDNS;
493             {
494             $DNS::Filter::UDPDNS::VERSION = '0.30';
495             }
496              
497 5     5   100 use strict;
  5         9  
  5         140  
498 5     5   28 use POE::Filter;
  5         8  
  5         118  
499 5     5   26 use Socket;
  5         10  
  5         3153  
500 5     5   32 use Net::DNS::Packet;
  5         8  
  5         2189  
501              
502             sub new {
503 5     5   20 my $class = shift;
504 5         76 bless {}, $class;
505             }
506              
507             sub get {
508 202     202   1311 my $self = shift;
509 202         297 my $data = shift;
510              
511 202         275 my @ret;
512 202         481 for my $d ( @$data ) {
513 210 50       609 ref($d) eq "ARRAY"
514             or die "UDPDNS filter expected arrayrefs for input\n";
515 210         864 my($port, $inet) = sockaddr_in($d->[0]);
516 210         3176 my $inetstr = inet_ntoa($inet);
517 210         1863 my($p, $err) = Net::DNS::Packet->new(\$d->[1]);
518 210 50       29381 if ( !$p ) {
519 0         0 warn "Cannot create DNS question for packet received from " .
520             "$inetstr: $err\n";
521             } else {
522 210         11553 $p->answerfrom("$inetstr:$port");
523 210         2016 push @ret, $p;
524             }
525             }
526 202         805 return \@ret;
527             }
528              
529             sub put {
530 198     198   1780 my $self = shift;
531 198         279 my $data = shift;
532              
533 198         278 my @ret;
534 198         483 for my $d ( @$data ) {
535 198         644 my($inetstr, $port) = split /:/, $d->answerfrom();
536 198         2321 $d->{buffer} = ''; #sigh
537 198 50       732 if ( !defined $port ) {
538 0         0 warn "answerfrom not set in DNS packet, no destination known\n";
539             } else {
540 198         2068 push @ret,
541             [ pack_sockaddr_in($port, inet_aton($inetstr)), $d->data ];
542             }
543             }
544 198         95274 return \@ret;
545             }
546              
547             1;
548              
549             __END__