File Coverage

blib/lib/Net/DNS/Async.pm
Criterion Covered Total %
statement 98 123 79.6
branch 22 48 45.8
condition 1 3 33.3
subroutine 13 14 92.8
pod 2 6 33.3
total 136 194 70.1


line stmt bran cond sub pod time code
1             package Net::DNS::Async;
2              
3 2     2   44480 use strict;
  2         73  
  2         87  
4 2     2   10 use warnings;
  2         4  
  2         77  
5 2     2   21 use vars qw($VERSION $_LEVEL);
  2         4  
  2         170  
6             use constant {
7 2         249 NDS_CALLBACKS => 0,
8             NDS_RESOLVER => 1,
9             NDS_FQUERY => 2,
10             NDS_RETRIES => 3,
11             NDS_SENDTIME => 4,
12             NDS_SOCKET => 5,
13 2     2   12 };
  2         2  
14 2     2   2286 use Net::DNS::Resolver;
  2         350084  
  2         69  
15 2     2   20 use IO::Select;
  2         5  
  2         95  
16 2     2   2266 use Time::HiRes;
  2         4140  
  2         11  
17 2     2   2642 use Storable qw(freeze thaw);
  2         8581  
  2         2772  
18              
19             $VERSION = '1.07';
20             $_LEVEL = 0;
21              
22             sub new {
23 1     1 0 19 my $class = shift;
24 1 50       9 my $self = ($#_ == 0) ? { %{ (shift) } } : { @_ };
  0         0  
25 1         5 $self->{Pending} = [ ];
26 1         4 $self->{Queue} = { };
27 1 50       7 $self->{QueueSize} = 20 unless $self->{QueueSize};
28 1 50       6 $self->{Timeout} = 4 unless $self->{Timeout};
29 1         22 $self->{Resolver} = new Net::DNS::Resolver();
30 1         753 $self->{Selector} = new IO::Select();
31 1 50       20 $self->{Retries} = 3 unless $self->{Retries};
32 1         5 return bless $self, $class;
33             }
34              
35             sub add {
36 100     100 1 4563 my ($self, $params, @query) = @_;
37 100         136 my ($callback, @ns);
38              
39 100 50       363 if (ref($params) eq 'HASH') {
40 0 0       0 @query = @{ $params->{Query} } if exists $params->{Query};
  0         0  
41 0         0 $callback = $params->{Callback};
42 0 0       0 @ns = @{ $params->{Nameservers} }
  0         0  
43             if exists $params->{Nameservers};
44             }
45             else {
46 100         133 $callback = $params;
47             }
48              
49 100 50       238 unless (ref($callback) eq 'CODE') {
50 0         0 die "add() requires a CODE reference for a callback";
51             }
52 100 50       212 unless (@query) {
53 0         0 die "add() requires a DNS query";
54             }
55              
56 100         391 my $frozen = freeze(\@query);
57 100 50       4746 unless (@ns) {
58             # It's a regular boring query, we can fold it.
59             # I wouldn't like to do this in a multi-threaded environment.
60 100         158 for my $data (values %{ $self->{Queue} }) {
  100         302  
61 290 100       820 if ($frozen eq $data->[NDS_FQUERY]) {
62             # Allow the use of slot 0 for custom hacks.
63 20 50       100 unless ($data->[NDS_RESOLVER]) {
64 20         56 push(@{ $data->[NDS_CALLBACKS] }, $callback);
  20         55  
65 20         82 return;
66             }
67             }
68             }
69             }
70              
71             # if ($_LEVEL) { add to Pending } else { recv/send }
72              
73 80         257 $self->recv(0); # Perform fast case unconditionally.
74             # print "Queue size " . scalar(keys %{ $self->{Queue} });
75 80         152 while (scalar(keys %{ $self->{Queue} }) > $self->{QueueSize}) {
  80         341  
76             # I'm fairly sure this can't busy wait since it must
77             # either time out an entry or receive an entry when called
78             # with no arguments.
79 0         0 $self->recv();
80             }
81              
82             # [ [ $callback ], $frozen, 0, undef, undef ];
83 80         154 my $data = [ ];
84 80         227 $data->[NDS_CALLBACKS] = [ $callback ];
85 80 50       234 $data->[NDS_RESOLVER] = new Net::DNS::Resolver(
86             nameservers => \@ns
87             ) if @ns;
88 80         145 $data->[NDS_FQUERY] = $frozen;
89 80         140 $data->[NDS_RETRIES] = 0;
90 80         211 $self->send($data);
91             }
92              
93             sub cleanup {
94 0     0 0 0 my ($self, $data) = @_;
95              
96 0         0 my $socket = $data->[NDS_SOCKET];
97 0 0       0 if ($socket) {
98 0         0 $self->{Selector}->remove($socket);
99 0         0 delete $self->{Queue}->{$socket->fileno};
100 0         0 $socket->close();
101             }
102             }
103              
104             sub send {
105 80     80 0 108 my ($self, $data) = @_;
106              
107 80         93 my @query = @{ thaw($data->[NDS_FQUERY]) };
  80         400  
108 80   33     2083 my $resolver = $data->[NDS_RESOLVER] || $self->{Resolver};
109 80         429 my $socket = $resolver->bgsend(@query);
110              
111 80 50       73515 unless ($socket) {
112 0         0 die "No socket returned from bgsend()";
113             }
114 80 50       336 unless ($socket->fileno) {
115 0         0 die "Socket returned from bgsend() has no fileno";
116             }
117              
118 80         594 $data->[NDS_SENDTIME] = time();
119 80         164 $data->[NDS_SOCKET] = $socket;
120              
121 80         277 $self->{Queue}->{$socket->fileno} = $data;
122 80         1004 $self->{Selector}->add($socket);
123             }
124              
125             sub recv {
126 82     82 0 117 my $self = shift;
127 82         126 my $time = shift;
128              
129 82 100       208 unless (defined $time) {
130 2         3 $time = time();
131             # Find first timer.
132 2         4 for (values %{ $self->{Queue} }) {
  2         8  
133 3 50       13 $time = $_->[NDS_SENDTIME] if $_->[NDS_SENDTIME] < $time;
134             }
135             # Add timeout, and compute delay until then.
136 2         6 $time = $time + $self->{Timeout} - time();
137             # It could have been a while ago.
138 2 50       6 $time = 0 if $time < 0;
139             }
140              
141 82         382 my @sockets = $self->{Selector}->can_read($time);
142 82         74316 for my $socket (@sockets) {
143             # If we recursed from the user callback into add(), then
144             # we might have read from and closed this socket.
145             # XXX A neater solution would be to collect all the
146             # callbacks and perform them after this loop has exited.
147 80 50       297 next unless $socket->fileno;
148 80         801 $self->{Selector}->remove($socket);
149 80         2485 my $data = delete $self->{Queue}->{$socket->fileno};
150 80 50       574 unless ($data) {
151 0         0 die "No data for socket " . $socket->fileno;
152             }
153 80         359 my $response = $self->{Resolver}->bgread($socket);
154 80         49872 $socket->close();
155 80         2807 eval {
156 80         142 local $_LEVEL = 1;
157 80         125 $_->($response) for @{ $data->[NDS_CALLBACKS] };
  80         392  
158             };
159 80 50       195624 if ($@) {
160 0         0 die "Async died within " . __PACKAGE__ . ": $@";
161             }
162             }
163              
164 82         200 $time = time();
165 82         144 for my $data (values %{ $self->{Queue} }) {
  82         278  
166 161 50       1296 if ($data->[NDS_SENDTIME] + $self->{Timeout} < $time) {
167             # It timed out.
168 0         0 $self->cleanup($data);
169 0 0       0 if ($self->{Retries} < ++$data->[NDS_RETRIES]) {
170 0         0 local $_LEVEL = 1;
171 0         0 $_->(undef) for @{ $data->[NDS_CALLBACKS] };
  0         0  
172             }
173             else {
174 0         0 $self->send($data);
175             }
176             }
177             }
178             }
179              
180             sub await {
181 1     1 1 47 my $self = shift;
182             # If we have Pending, we need a better algorithm here.
183 1         2 $self->recv while keys %{ $self->{Queue} };
  3         189  
184             }
185              
186             *done = \&await;
187              
188             =head1 NAME
189              
190             Net::DNS::Async - Asynchronous DNS helper for high volume applications
191              
192             =head1 SYNOPSIS
193              
194             use Net::DNS::Async;
195              
196             my $c = new Net::DNS::Async(QueueSize => 20, Retries => 3);
197              
198             for (...) {
199             $c->add(\&callback, @query);
200             }
201             $c->await();
202              
203             sub callback {
204             my $response = shift;
205             ...
206             }
207              
208             =head1 DESCRIPTION
209              
210             Net::DNS::Async is a fire-and-forget asynchronous DNS helper.
211             That is, the user application adds DNS questions to the helper, and
212             the callback will be called at some point in the future without
213             further intervention from the user application. The application need
214             not handle selects, timeouts, waiting for a response or any other
215             such issues.
216              
217             If the same query is added to the queue more than once, the module
218             may combine the queries; that is, it will perform the query only
219             once, and will call each callback registered for that query in turn,
220             passing the same Net::DNS::Response object to each query. For this
221             reason, you should not modify the Net::DNS::Response object in any
222             way lest you break things horribly for a subsequent callback.
223              
224             This module is similar in principle to POE::Component::Client::DNS,
225             but does not require POE.
226              
227             =head1 CONSTRUCTOR
228              
229             The class method new(...) constructs a new helper object. All arguments
230             are optional. The following parameters are recognised as arguments
231             to new():
232              
233             =over 4
234              
235             =item QueueSize
236              
237             The size of the query queue. If this is exceeded, further calls to
238             add() will block until some responses are received or time out.
239              
240             =item Retries
241              
242             The number of times to retry a query before giving up.
243              
244             =item Timeout
245              
246             The timeout for an individual query.
247              
248             =back
249              
250             =head1 METHODS
251              
252             =over 4
253              
254             =item $c->add($callback, @query)
255              
256             Adds a new query for asynchronous handling. The @query arguments are
257             those to Net::DNS::Resolver->bgsend(), q.v. This call will block
258             if the queue is full. When some pending responses are received or
259             timeout events occur, the call will unblock.
260              
261             The user callback will be called at some point in the future, with
262             a Net::DNS::Packet object representing the response. If the query
263             timed out after the specified number of retries, the callback will
264             be called with undef.
265              
266             =item $c->await()
267              
268             Flushes the queue, that is, waits for and handles all remaining
269             responses.
270              
271             =back
272              
273             =head1 BUGS
274              
275             The test suite does not test query timeouts.
276              
277             =head1 SEE ALSO
278              
279             L,
280             L
281              
282             =head1 COPYRIGHT
283              
284             Copyright (c) 2005-2006 Shevek. All rights reserved.
285              
286             This program is free software; you can redistribute it and/or modify
287             it under the same terms as Perl itself.
288              
289             =cut
290              
291             1;